diff --git a/proxy_manager/proxy_manager.py b/proxy_manager/proxy_manager.py index 0ab6d56..aa06b2d 100644 --- a/proxy_manager/proxy_manager.py +++ b/proxy_manager/proxy_manager.py @@ -27,3 +27,6 @@ class ProxyManager: else: return [FR_PROXY_RES_OXY, FR_PROXY_ASOCK_RES_2, FR_ASOCKS_MOBILE_PROXY] # return [FR_PROXY_RES_OXY] + + def get_result_link_proxy(self): + return [FR_PROXY_ASOCK_RES_2, FR_ASOCKS_MOBILE_PROXY] diff --git a/workers/result_link_checker.py b/workers/result_link_checker.py new file mode 100644 index 0000000..9bff482 --- /dev/null +++ b/workers/result_link_checker.py @@ -0,0 +1,171 @@ +import json +import random +import threading +import time +from http.cookies import SimpleCookie + +import pika +import requests + +from db.mongo_manager import MONGO_STORE_MANAGER +from models.result_pojo import RequestResult +from proxy_manager.proxy_manager import ProxyManager +from queue_message.CookiesPublisher import CookiesPublisher, TEST_QUEUE, MORNING_DATA_CACHE +from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials +from workers.proxies_constants import PROXY_LIST_FR + +DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées" +INVALID = "Depuis plus de 130 ans," + + +class ResultLinkChecker(threading.Thread): + + def __init__(self, cookiesPublisher: CookiesPublisher, all_links: list, proxy_manager: ProxyManager, + queue_to_listen=REQUEST_DATA_QUEUE, + ip_country="FR", segment_position=1, + limit=40): + super().__init__() + self.link_to_validate_list = all_links + self.cookie = SimpleCookie() + self.cookiesPublisher = cookiesPublisher + self.segment_position = segment_position + self.queue_to_listen = queue_to_listen + self.ip_country = ip_country + self.proxy_manager = proxy_manager + self.limit = limit + + def set_up_connection(self): + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials)) + self.channel = self.connection.channel() + + def listen_to_queue(self, callback): + self.channel.basic_qos(prefetch_count=1) + self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=callback) + self.channel.start_consuming() + + def send_request(self, url: str, _received_dict=None) -> RequestResult: + _ua = 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36' + if _received_dict is not None: + _ua = _received_dict['ua'] + self.cookie.load(self.cookie_str) + headers = { + 'User-Agent': _ua, + 'Accept': '*/*', + 'Accept-Encoding': 'gzip, deflate, br', + 'Cache-Control': 'max-age=0', + 'Referer': url, + 'Cookie': self.cookie_str, + 'Sec-Fetch-Mode': 'navigate', + 'Host': 'rendezvousparis.hermes.com', + 'Sec-Fetch-Site': 'same-origin', + 'Sec-Fetch-Dest': 'document', + 'Accept-Language': 'fr-FR,fr;q=0.6'} + _proxy_to_use = random.choice(self.proxy_manager.get_result_link_proxy()) + print(_proxy_to_use) + print("received cookie is " + str(self.cookie_str)) + print("send request for link: " + url) + try: + print("will send request with ua {}".format(_ua)) + print("will send request with cookie {}".format(self.cookie_str)) + response = requests.get(url=url, headers=headers, verify=False, proxies=_proxy_to_use, + timeout=60) + print(response.status_code) + if response.status_code == 200: + _content = response.text + print(response.text) + print(response.url) + file1 = open("{}.html".format(url.replace('/', '_')), "w") + file1.write(response.text) + file1.close() + # set new cookies + _cookies_to_set = response.headers['set-cookie'] + self.cookie.load(_cookies_to_set) + new_cookies = {k: v.value for k, v in self.cookie.items()} + new_coolies_str = "" + for key in new_cookies: + new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" + print("will publish to queue {}".format(new_coolies_str)) + # upload the cookie to queue + if _received_dict is not None: + _received_dict['cookiesStr'] = new_coolies_str + print("body in json:{}".format(json.dumps(_received_dict))) + self.cookiesPublisher.publish_body(json.dumps(_received_dict)) + else: + self.cookiesPublisher.publish_body(new_coolies_str) + self.cookie_str = new_coolies_str + return RequestResult.SUCCESS + elif response.status_code == 502: + return RequestResult.BAD_GATEWAY + else: + return RequestResult.BLOCKED + except Exception as error: + print(error) + return RequestResult.PROXY_ERROR + + def on_message(self, ch, method, properties, body): + print(f" [x] Received {body}") + _message_in_queue_count = self.cookiesPublisher.message_count() + print("message count in queue is {}".format(_message_in_queue_count)) + _received_object = body.decode("UTF-8") + js_data = None + _received_dict = None + if "glrd" in _received_object: + _received_dict = json.loads(_received_object) + _received_cookies = _received_dict["cookiesStr"] + else: + _received_cookies = _received_object + self.cookie_str = _received_cookies + random.shuffle(self.link_to_validate_list) + if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit: + print("links number is {}".format(len(self.link_to_validate_list))) + can_continue = None + for link in self.link_to_validate_list: + print(link) + can_continue = self.send_request(link, _received_dict) + if can_continue == RequestResult.BLOCKED: + print("cannot continue, blocked, then skip") + break + else: + if can_continue == RequestResult.BAD_GATEWAY: + time.sleep(30) + break + time.sleep(random.randint(2, 5)) + print("can continue, continue") + if can_continue == RequestResult.SUCCESS: + self.link_to_validate_list.remove(link) + + if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: + print("will requeue the message") + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + else: + print("will ack") + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + print("empty list, no need to ack") + time.sleep(60) + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + + +def check_all_links(segment_position=1): + _queue_name = MORNING_DATA_CACHE + _all_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() + _link_list = [] + for _item in _all_items: + if _item.url_validated is not None and _item.url_validated is True: + _link_list.append(_item.url) + print(_link_list) + cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE) + cookiesPublisher.set_up_connection() + print("filter links with ip_country") + _proxy_manager = ProxyManager() + receiver = ResultLinkChecker(cookiesPublisher=cookiesPublisher, proxy_manager=_proxy_manager, all_links=_link_list, + queue_to_listen=_queue_name, ip_country="FR", segment_position=segment_position, + limit=0) + print("will connect to queue") + receiver.set_up_connection() + receiver.listen_to_queue(receiver.on_message) + + +if __name__ == '__main__': + check_all_links()