diff --git a/db/mongo_manager.py b/db/mongo_manager.py index 03fc204..229d407 100755 --- a/db/mongo_manager.py +++ b/db/mongo_manager.py @@ -137,7 +137,7 @@ class MongoDbManager: collection = self.db[collection_name] validated_at = time.strftime("%H:%M:%S", time.localtime()) - validated_by = "requests" + str(segement_position) + validated_by = "requests:" + str(segement_position) if is_invalid: validated_by = "Invalid" if is_duplicated: diff --git a/queue_message/CookiesPublisher.py b/queue_message/CookiesPublisher.py index 5561f7d..99295bb 100644 --- a/queue_message/CookiesPublisher.py +++ b/queue_message/CookiesPublisher.py @@ -37,4 +37,3 @@ class CookiesPublisher: def message_count(self): return self.channel.queue_declare(queue=self.to_queue, durable=True).method.message_count - # return self.queue_method.method.message_count diff --git a/scheduler_test.py b/scheduler_test.py index c2c3ede..2ae5022 100644 --- a/scheduler_test.py +++ b/scheduler_test.py @@ -5,14 +5,14 @@ from request_sender_test import start_send_requests def start_book_appointment(): - start_send_requests(thread_number=25, file_path='~/Desktop/contact_list_2024-04-25_2.xlsx') + start_send_requests(thread_number=25, file_path='~/Desktop/contact_list_2024-05-04.xlsx') def start_check_results_job(sched): sched.add_job(start_book_appointment, 'cron', day_of_week='mon-sat', hour='10', minute='30', misfire_grace_time=10, - second='0', timezone='Europe/Paris', max_instances=1, args=[]) + second='20', timezone='Europe/Paris', max_instances=1, args=[]) def config_and_start_jobs(): diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py new file mode 100644 index 0000000..63cbbb6 --- /dev/null +++ b/workers/link_validator_with_provided_list.py @@ -0,0 +1,224 @@ +import json +import logging +import random +import threading +import time +from http.cookies import SimpleCookie + +import pika +import requests + +from db.mongo_manager import MONGO_STORE_MANAGER +from models.LinkPojo import LinkPojo +from models.result_pojo import RequestResult +from proxy_manager.proxy_manager import ProxyManager +from queue_message.CookiesPublisher import CookiesPublisher, REQUEST_DATA_QUEUE_TEST, TEST_QUEUE, SHARED_OBJECT +from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials +from utils.AppLogging import init_logger +from workers.proxies_constants import PROXY_LIST_FR + +DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées" +INVALID = "Depuis plus de 130 ans," + +init_logger() +logger = logging.getLogger() + + +class LinkValidatorWithProvidedList(threading.Thread): + + def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list, + queue_to_listen=REQUEST_DATA_QUEUE, + ip_country="FR", + limit=40): + super().__init__() + self.link_to_validate_list = link_list + self.cookie = SimpleCookie() + self.cookiesPublisher = cookiesPublisher + self.queue_to_listen = queue_to_listen + self.ip_country = ip_country + self.filter_with_ip_country() + self.proxy_manager = ProxyManager() + self.limit = limit + + def set_up_connection(self): + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials)) + self.channel = self.connection.channel() + + def listen_to_queue(self, callback): + self.channel.basic_qos(prefetch_count=1) + self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=callback) + self.channel.start_consuming() + + def send_request(self, linkPojo: LinkPojo, _received_dict=None) -> RequestResult: + _ua = 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36' + if _received_dict is not None: + _ua = _received_dict['ua'] + self.cookie.load(self.cookie_str) + headers = { + 'User-Agent': _ua, + 'Accept': '*/*', + 'Accept-Encoding': 'gzip, deflate, br', + 'Cache-Control': 'max-age=0', + 'Referer': linkPojo.url, + 'Cookie': self.cookie_str, + 'Sec-Fetch-Mode': 'navigate', + 'Host': 'rendezvousparis.hermes.com', + 'Sec-Fetch-Site': 'same-origin', + 'Sec-Fetch-Dest': 'document', + 'Accept-Language': 'fr-FR,fr;q=0.6'} + _proxy_to_use = random.choice(self.proxy_manager.get_link_validate_proxy(self.link_to_validate_list)) + print(_proxy_to_use) + print("received cookie is " + str(self.cookie_str)) + print("send request for link: " + linkPojo.url) + try: + print("will send request with ua {}".format(_ua)) + print("will send request with cookie {}".format(self.cookie_str)) + response = requests.get(url=linkPojo.url, headers=headers, verify=False, proxies=self.proxy_to_use, + timeout=60) + print(response.status_code) + if response.status_code == 200: + _content = response.text + print(response.text) + if "Votre demande de rendez-vous Maroquinerie a bien été enregistrée" in _content: + print(response.url) + MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, + segement_position=threading.currentThread().name) + # set new cookies + _cookies_to_set = response.headers['set-cookie'] + self.cookie.load(_cookies_to_set) + new_cookies = {k: v.value for k, v in self.cookie.items()} + new_coolies_str = "" + for key in new_cookies: + new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" + print("will publish to queue {}".format(new_coolies_str)) + # upload the cookie to queue + if _received_dict is not None: + _received_dict['cookiesStr'] = new_coolies_str + print("body in json:{}".format(json.dumps(_received_dict))) + self.cookiesPublisher.publish_body(json.dumps(_received_dict)) + else: + self.cookiesPublisher.publish_body(new_coolies_str) + self.cookie_str = new_coolies_str + return RequestResult.SUCCESS + elif INVALID in _content: + + MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_invalid=True) + # set new cookies + _cookies_to_set = response.headers['set-cookie'] + self.cookie.load(_cookies_to_set) + new_cookies = {k: v.value for k, v in self.cookie.items()} + new_coolies_str = "" + for key in new_cookies: + new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" + print("will publish to queue {}".format(new_coolies_str)) + # upload the cookie to queue + self.cookiesPublisher.publish_body(new_coolies_str) + self.cookie_str = new_coolies_str + return RequestResult.SUCCESS + + elif DOUBLE_MESSAGE in _content: + print(response.url) + MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_duplicated=True) + # set new cookies + _cookies_to_set = response.headers['set-cookie'] + self.cookie.load(_cookies_to_set) + new_cookies = {k: v.value for k, v in self.cookie.items()} + new_coolies_str = "" + for key in new_cookies: + new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" + print("will publish to queue {}".format(new_coolies_str)) + # upload the cookie to queue + self.cookiesPublisher.publish_body(new_coolies_str) + self.cookie_str = new_coolies_str + return RequestResult.SUCCESS + else: + return RequestResult.UNKNOWN + elif response.status_code == 502: + return RequestResult.BAD_GATEWAY + else: + return RequestResult.BLOCKED + except Exception as error: + print(error) + return RequestResult.PROXY_ERROR + + def on_message(self, ch, method, properties, body): + print(f" [x] Received {body}") + _message_in_queue_count = self.cookiesPublisher.message_count() + print("message count in queue is {}".format(_message_in_queue_count)) + _received_object = body.decode("UTF-8") + _received_dict = None + if "glrd" in _received_object: + _received_dict = json.loads(_received_object) + _received_cookies = _received_dict["cookiesStr"] + else: + _received_cookies = _received_object + self.cookie_str = _received_cookies + random.shuffle(self.link_to_validate_list) + if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit: + print("{}:links number is {}".format(threading.currentThread().name, len(self.link_to_validate_list))) + can_continue = None + for link_to_validate in self.link_to_validate_list: + print(link_to_validate) + self.proxy_to_use = random.choice(PROXY_LIST_FR) + can_continue = self.send_request(link_to_validate, _received_dict) + # remove the tested link from link list + self.link_to_validate_list.remove(link_to_validate) + if can_continue == RequestResult.BLOCKED: + print("cannot continue, blocked, then skip") + break + else: + if can_continue == RequestResult.BAD_GATEWAY: + time.sleep(30) + break + time.sleep(random.randint(2, 5)) + print("can continue, continue") + if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: + print("will requeue the message") + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + else: + print("will ack") + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + print("empty list, no need to ack") + time.sleep(5) + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + self.channel.stop_consuming() + + def filter_with_ip_country(self): + _link_list_to_click = [] + for _link in self.link_to_validate_list: + if _link.ip_country == self.ip_country: + _link_list_to_click.append(_link) + self.link_to_validate_list = _link_list_to_click + + +def validate_links(cookiesPublisher, queue_name: str, link_list: list): + receiver = LinkValidatorWithProvidedList(cookiesPublisher=cookiesPublisher, link_list=link_list, + queue_to_listen=queue_name, ip_country="FR", limit=0) + print("{} set_up_connection".format(threading.currentThread().name)) + receiver.set_up_connection() + receiver.listen_to_queue(receiver.on_message) + + +def validate_all_links(): + all_link_list = MONGO_STORE_MANAGER.get_links_to_validate() + _queue_name = TEST_QUEUE + _segment_number = 4 + last_thread = None + for i in range(0, _segment_number): + logger.info("{}:{} links to validate".format(threading.currentThread().name, len(all_link_list))) + logger.info("segment is {}".format(i)) + _cookiesPublisher = CookiesPublisher(queue_name=TEST_QUEUE) + _cookiesPublisher.set_up_connection() + _step = int(len(all_link_list) / _segment_number) + _sublist = all_link_list[i * _step:_step * (i + 1)] + _thread1 = threading.Thread(target=validate_links, args=(_cookiesPublisher, SHARED_OBJECT, _sublist)) + last_thread = _thread1 + _thread1.start() + last_thread.join() + + +if __name__ == '__main__': + # generate test data + validate_all_links()