import random import threading import time from http.cookies import SimpleCookie import pika import requests from db.mongo_manager import MONGO_STORE_MANAGER from models.LinkPojo import LinkPojo from models.result_pojo import RequestResult from proxy_manager.proxy_manager import ProxyManager from queue_message.CookiesPublisher import CookiesPublisher, REQUEST_DATA_QUEUE_TEST from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials, REQUEST_DATA_DE from workers.proxies_constants import PROXY_LIST_FR DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées" INVALID = "Depuis plus de 130 ans," class LinkValidator(threading.Thread): def __init__(self, cookiesPublisher: CookiesPublisher, proxy_manager: ProxyManager, queue_to_listen=REQUEST_DATA_QUEUE, ip_country="FR", segment_position=1, limit=40): super().__init__() self.link_to_validate_list = [] self.cookie = SimpleCookie() self.cookiesPublisher = cookiesPublisher self.segment_position = segment_position self.update_validate_list() self.queue_to_listen = queue_to_listen self.ip_country = ip_country self.filter_with_ip_country() self.proxy_manager = proxy_manager self.limit = limit # self.cookie_str = 'datadome=~pxdHFAvsQl2rvDrTzhPgCHxu~4TBcePTTE~Cy8Rgol6oMRc11gA02VRp0Z3uEDUszCjacubNu7vbfQCh27gz8RC10u_325pt_gsMmJh1ScGvOofVJiVAbEKvSEUjd82;policy=accepted;app.sig=PhjmDkq_dI49pADppDNKxpLe_G4;app=eyJmbGFzaCI6e30sImNhY2hlZmxhc2giOltdLCJjc3JmU2VjcmV0IjoiYnRodHNYU1lvdnl4RzVGakpGRDZsQ0JtIn0=;lang=fr;' def set_up_connection(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials)) self.channel = self.connection.channel() def listen_to_queue(self, callback): self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=callback) self.channel.start_consuming() def send_request(self, linkPojo: LinkPojo) -> RequestResult: self.cookie.load(self.cookie_str) headers = { 'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'max-age=0', 'Referer': linkPojo.url, 'Cookie': self.cookie_str, 'Sec-Fetch-Mode': 'navigate', 'Host': 'rendezvousparis.hermes.com', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Dest': 'document', 'Accept-Language': 'fr-FR,fr;q=0.6'} _proxy_to_use = random.choice(self.proxy_manager.get_link_validate_proxy(self.link_to_validate_list)) print(_proxy_to_use) print("received cookie is " + str(self.cookie_str)) print("send request for link: " + linkPojo.url) try: response = requests.get(url=linkPojo.url, headers=headers, verify=False, proxies=_proxy_to_use, timeout=30) print(response.status_code) if response.status_code == 200: _content = response.text print(response.text) if "Votre demande de rendez-vous Maroquinerie a bien été enregistrée" in _content: print(response.url) MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo) # set new cookies _cookies_to_set = response.headers['set-cookie'] self.cookie.load(_cookies_to_set) new_cookies = {k: v.value for k, v in self.cookie.items()} new_coolies_str = "" for key in new_cookies: new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" print("will publish to queue {}".format(new_coolies_str)) # upload the cookie to queue self.cookiesPublisher.publish_body(new_coolies_str) self.cookie_str = new_coolies_str return RequestResult.SUCCESS elif INVALID in _content: MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_invalid=True) # set new cookies _cookies_to_set = response.headers['set-cookie'] self.cookie.load(_cookies_to_set) new_cookies = {k: v.value for k, v in self.cookie.items()} new_coolies_str = "" for key in new_cookies: new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" print("will publish to queue {}".format(new_coolies_str)) # upload the cookie to queue self.cookiesPublisher.publish_body(new_coolies_str) self.cookie_str = new_coolies_str return RequestResult.SUCCESS elif DOUBLE_MESSAGE in _content: print(response.url) MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_duplicated=True) # set new cookies _cookies_to_set = response.headers['set-cookie'] self.cookie.load(_cookies_to_set) new_cookies = {k: v.value for k, v in self.cookie.items()} new_coolies_str = "" for key in new_cookies: new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";" print("will publish to queue {}".format(new_coolies_str)) # upload the cookie to queue self.cookiesPublisher.publish_body(new_coolies_str) self.cookie_str = new_coolies_str return RequestResult.SUCCESS else: return RequestResult.UNKNOWN elif response.status_code == 502: return RequestResult.BAD_GATEWAY else: return RequestResult.BLOCKED except Exception as error: print(error) return RequestResult.PROXY_ERROR def update_validate_list(self): # for the moment, max segment is 2 _all_list = MONGO_STORE_MANAGER.get_links_to_validate() list_length = len(_all_list) middle = int(len(_all_list) / 2) if self.segment_position == 1: if list_length > 1: self.link_to_validate_list = _all_list[0:middle] else: self.link_to_validate_list = _all_list else: if list_length > 1: self.link_to_validate_list = _all_list[middle:] else: self.link_to_validate_list = [] def on_message(self, ch, method, properties, body): print(f" [x] Received {body}") _message_in_queue_count = self.cookiesPublisher.message_count() print("message count in queue is {}".format(_message_in_queue_count)) self.update_validate_list() self.filter_with_ip_country() self.cookie_str = body.decode("UTF-8") random.shuffle(self.link_to_validate_list) if len(self.link_to_validate_list) > 0 and _message_in_queue_count > self.limit: print("links number is {}".format(len(self.link_to_validate_list))) can_continue = None for con in self.link_to_validate_list: # if not is_already_sent(con): print(con.email) self.proxy_to_use = random.choice(PROXY_LIST_FR) can_continue = self.send_request(con) if can_continue == RequestResult.BLOCKED: print("cannot continue, blocked, then skip") break else: if can_continue == RequestResult.BAD_GATEWAY: time.sleep(30) break time.sleep(random.randint(2, 5)) print("can continue, continue") if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR: print("will requeue the message") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) else: print("will ack") ch.basic_ack(delivery_tag=method.delivery_tag) else: print("empty list, no need to ack") time.sleep(60) ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) def filter_with_ip_country(self): _link_list_to_click = [] for _link in self.link_to_validate_list: if _link.ip_country == self.ip_country: _link_list_to_click.append(_link) self.link_to_validate_list = _link_list_to_click def validate_with_FR_ip(segment_position=1): # _queue_name = REQUEST_DATA_QUEUE _queue_name = REQUEST_DATA_DE cookiesPublisher = CookiesPublisher(queue_name=_queue_name) cookiesPublisher.set_up_connection() print("filter links with ip_country") _proxy_manager = ProxyManager() receiver = LinkValidator(cookiesPublisher=cookiesPublisher, proxy_manager=_proxy_manager, queue_to_listen=_queue_name, ip_country="FR", segment_position=segment_position, limit=0) print("will connect to queue") receiver.set_up_connection() receiver.listen_to_queue(receiver.on_message) pass if __name__ == '__main__': validate_with_FR_ip()