import json import random import threading import time from http.cookies import SimpleCookie import pika import requests from db.mongo_manager import MONGO_STORE_MANAGER from models.result_pojo import RequestResult from proxy_manager.proxy_manager import ProxyManager from queue_message.CookiesPublisher import CookiesPublisher, TEST_QUEUE, MORNING_DATA_CACHE from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials from workers.proxies_constants import PROXY_LIST_FR DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées" INVALID = "Depuis plus de 130 ans," class ResultLinkChecker(threading.Thread): def __init__(self, cookiesPublisher: CookiesPublisher, all_links: list, proxy_manager: ProxyManager, queue_to_listen=REQUEST_DATA_QUEUE, ip_country="FR", segment_position=1, limit=40): super().__init__() self.link_to_validate_list = all_links self.cookie = SimpleCookie() self.cookiesPublisher = cookiesPublisher self.segment_position = segment_position self.queue_to_listen = queue_to_listen self.ip_country = ip_country self.proxy_manager = proxy_manager self.limit = limit def set_up_connection(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials)) self.channel = self.connection.channel() def listen_to_queue(self, callback): self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=callback) self.channel.start_consuming() def send_request(self, url: str, _received_dict=None) -> RequestResult: _ua = 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36' if _received_dict is not None: _ua = _received_dict['ua'] self.cookie.load(self.cookie_str) headers = { 'User-Agent': _ua, 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'max-age=0', 'Referer': url, 'Cookie': self.cookie_str, 'Sec-Fetch-Mode': 'navigate', 'Host': 'rendezvousparis.hermes.com', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Dest': 'document', 'Accept-Language': 'fr-FR,fr;q=0.6'} _proxy_to_use = random.choice(self.proxy_manager.get_result_link_proxy()) print(_proxy_to_use) print("received cookie is " + str(self.cookie_str)) print("send request for link: " + url) try: print("will send request with ua {}".format(_ua)) print("will send request with cookie {}".format(self.cookie_str)) response = requests.get(url=url, headers=headers, verify=False, proxies=_proxy_to_use, timeout=60) print(response.status_code) if response.status_code == 200: _content = response.text print(response.text) print(response.url) file1 = open("{}.html".format(url.replace('/', '_')), "w") file1.write(response.text) file1.close() # set new cookies _cookies_to_set = response.headers['set-cookie'] self.cookie.load(_cookies_to_set) new_cookies = {k: v.value for k, v in self.cookie.items()} new_coolies_str = "" for key in new_cookies: new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" print("will publish to queue {}".format(new_coolies_str)) # upload the cookie to queue if _received_dict is not None: _received_dict['cookiesStr'] = new_coolies_str print("body in json:{}".format(json.dumps(_received_dict))) self.cookiesPublisher.publish_body(json.dumps(_received_dict)) else: self.cookiesPublisher.publish_body(new_coolies_str) self.cookie_str = new_coolies_str return RequestResult.SUCCESS elif response.status_code == 502: return RequestResult.BAD_GATEWAY else: return RequestResult.BLOCKED except Exception as error: print(error) return RequestResult.PROXY_ERROR def on_message(self, ch, method, properties, body): print(f" [x] Received {body}") _message_in_queue_count = self.cookiesPublisher.message_count() print("message count in queue is {}".format(_message_in_queue_count)) _received_object = body.decode("UTF-8") js_data = None _received_dict = None if "glrd" in _received_object: _received_dict = json.loads(_received_object) _received_cookies = _received_dict["cookiesStr"] else: _received_cookies = _received_object self.cookie_str = _received_cookies random.shuffle(self.link_to_validate_list) if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit: print("links number is {}".format(len(self.link_to_validate_list))) can_continue = None for link in self.link_to_validate_list: print(link) can_continue = self.send_request(link, _received_dict) if can_continue == RequestResult.BLOCKED: print("cannot continue, blocked, then skip") break else: if can_continue == RequestResult.BAD_GATEWAY: time.sleep(30) break time.sleep(random.randint(2, 5)) print("can continue, continue") if can_continue == RequestResult.SUCCESS: self.link_to_validate_list.remove(link) if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: print("will requeue the message") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) else: print("will ack") ch.basic_ack(delivery_tag=method.delivery_tag) else: print("empty list, no need to ack") time.sleep(60) ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) def check_all_links(segment_position=1): _queue_name = MORNING_DATA_CACHE _all_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() _link_list = [] for _item in _all_items: if _item.url_validated is not None and _item.url_validated is True: _link_list.append(_item.url) print(_link_list) cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE) cookiesPublisher.set_up_connection() print("filter links with ip_country") _proxy_manager = ProxyManager() receiver = ResultLinkChecker(cookiesPublisher=cookiesPublisher, proxy_manager=_proxy_manager, all_links=_link_list, queue_to_listen=_queue_name, ip_country="FR", segment_position=segment_position, limit=0) print("will connect to queue") receiver.set_up_connection() receiver.listen_to_queue(receiver.on_message) if __name__ == '__main__': check_all_links()