diff --git a/db/mongo_manager.py b/db/mongo_manager.py index c2739af..0c73fb1 100755 --- a/db/mongo_manager.py +++ b/db/mongo_manager.py @@ -20,6 +20,7 @@ EMAIL_LIST = "EMAIL_LIST" DESTINATION_EMAIL_LIST = "DESTINATION_EMAIL_LIST" LINKS_TO_VALIDATE = "LINKS_TO_VALIDATE" INVALID_EMAIL_LIST = "INVALID_EMAIL_LIST" +CONTACT_LIST_SERIAL_MAP = "CONTACT_LIST_SERIAL_MAP" class MongoDbManager: @@ -75,6 +76,13 @@ class MongoDbManager: result_list.append(ContactPojo.from_firestore_dict(document)) return result_list + def get_all_contact_serial_list(self) -> list: + result_list = [] + cursor = self.db[CONTACT_LIST_SERIAL_MAP] + for document in cursor.find(): + result_list.append(ContactPojo.from_firestore_dict(document)) + return result_list + def save_links_to_validate(self, link: str, mail_address: str, _all_contact_list: list): collection_to_use = self.db[LINKS_TO_VALIDATE] updated_at = time.strftime("%H:%M:%S", time.localtime()) @@ -174,7 +182,10 @@ class MongoDbManager: validated_by = "Double" collection.find_one_and_update({'_id': _id}, { "$set": {"url_validated": state, "validated_at": validated_at, "id": _id, "email": linkPojo.email, - "url": link, "validated_by_ua": ua, + "url": link, + "source_from": linkPojo.model, + "serial": linkPojo.serial, + "validated_by_ua": ua, "validated_by": validated_by}}, upsert=True) # remove the link from db diff --git a/link_validator_executor.py b/link_validator_executor.py index 3dd0b71..bcc25dc 100644 --- a/link_validator_executor.py +++ b/link_validator_executor.py @@ -10,6 +10,6 @@ if __name__ == '__main__': # generate test data while True: print("call validate_all_links()") - validate_all_links() + validate_all_links([]) print("wait for 30 seconds") time.sleep(30) diff --git a/models/LinkPojo.py b/models/LinkPojo.py index c58b67e..6a0b235 100644 --- a/models/LinkPojo.py +++ b/models/LinkPojo.py @@ -1,7 +1,9 @@ class LinkPojo(): - def __init__(self, url, email, updated_at, ip_country): + def __init__(self, url, email, model, serial, updated_at, ip_country): self.url = url self.email = email + self.model = model + self.serial = serial self.updated_at = updated_at self.ip_country = ip_country @@ -9,9 +11,12 @@ class LinkPojo(): def from_firestore_dict(source): updated_at = source['updated_at'] email = source['email'] + model = source['model'] + serial = source['serial'] url = source['url'] ip_country = "FR" if source.get('ip_country'): ip_country = source['ip_country'] - result = LinkPojo(email=email, url=url, updated_at=updated_at, ip_country=ip_country) + result = LinkPojo(email=email, url=url, model=model, serial=serial, updated_at=updated_at, + ip_country=ip_country) return result diff --git a/models/contact_pojo.py b/models/contact_pojo.py index fe6a77f..59fbd16 100755 --- a/models/contact_pojo.py +++ b/models/contact_pojo.py @@ -13,6 +13,7 @@ class ContactPojo: store: str note: str ua: str + serial: str def __init__(self, phone_number: str, passport_number: str, last_name: str, first_name: str, mail: str, ccid: str = "", @@ -38,6 +39,7 @@ class ContactPojo: u'store': self.store, u'ccid': self.ccid, u'position': self.position, + u'serial': self.serial, u'current_ip': self.current_ip, u'ua': self.ua } @@ -52,9 +54,13 @@ class ContactPojo: last_name = source['last_name'] first_name = source['first_name'] ip_country = "FR" + if source.get('ip_country'): ip_country = source['ip_country'] result = ContactPojo(phone_number=phone, passport_number=passport, mail=email, last_name=last_name, first_name=first_name) result.ip_country = ip_country + if source.get('serial'): + serial = source['serial'] + result.serial = serial return result diff --git a/proxy_manager/proxy_manager.py b/proxy_manager/proxy_manager.py index 42df19d..0de2f53 100644 --- a/proxy_manager/proxy_manager.py +++ b/proxy_manager/proxy_manager.py @@ -9,6 +9,11 @@ FR_PROXY_RES_OXY = { 'http': 'http://customer-rendezvous-cc-FR:Rdv202220212023@pr.oxylabs.io:7777', 'https': 'http://customer-rendezvous-cc-FR:Rdv202220212023@pr.oxylabs.io:7777' } +FR_PROXY_RES_PARIS_OXY = { + # customer-rendezvous-cc-fr-city-paris:PASSWORD@pr.oxylabs.io:7777 + 'http': 'http://customer-rendezvous-cc-fr-city-paris:Rdv202220212023@pr.oxylabs.io:7777', + 'https': 'http://customer-rendezvous-cc-fr-city-paris:Rdv202220212023@pr.oxylabs.io:7777' +} FR_PROXY_MOB_OXY = { 'http': 'http://customer-rendezvousmob-cc-FR:Rdv202220212023@pr.oxylabs.io:7777', @@ -62,7 +67,7 @@ class ProxyManager: self.logger = logger def get_link_validate_proxy(self, links_to_validate: list) -> list: - return [FR_PROXY_RES_OXY, FR_PROXY_MOB_OXY] + return [FR_PROXY_RES_PARIS_OXY] # if len(links_to_validate) > 15: # return [FR_PROXY_RES_OXY, FR_PROXY_MOB_OXY, FR_PROXY_ASOCK_RES_2, FR_DATA_IMPULSE_RES] # # return [FR_PROXY_RES_OXY, FR_PROXY_ASOCK_RES_2, FR_DATA_IMPULSE_RES, FR_ASOCKS_MOBILE_PROXY] diff --git a/utils/user_agent_helper.py b/utils/user_agent_helper.py new file mode 100644 index 0000000..dc9b88e --- /dev/null +++ b/utils/user_agent_helper.py @@ -0,0 +1,56 @@ +import re +from http.cookies import SimpleCookie +from pprint import pprint + + +def get_chrome_version_from_ua(ua: str) -> str: + _regex = "Chrome\/([0-9]+)" + _match = re.search(_regex, ua) + if _match: + return _match.group(1) + + +def generate_headers_from_request_message(_received_dict, cookie_str): + _ua = 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36' + cookie = SimpleCookie() + _chrome_version = '125' + if _received_dict is not None: + _ua = _received_dict['ua'] + _chrome_version = get_chrome_version_from_ua(_ua) + + _model = _received_dict['model'] + # _ua = "Mozilla/5.0 (Linux; Android 9; {}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.58 Mobile Safari/537.36".format( + # _model) + print("model:" + _model) + cookie.load(cookie_str) + headers = { + 'User-Agent': _ua, + 'Accept': '*/*', + 'Accept-Encoding': 'gzip, deflate, br', + 'Cache-Control': 'max-age=0', + 'Referer': "linkPojo.url", + 'Cookie': cookie_str, + 'Sec-Fetch-Mode': 'navigate', + 'Host': 'rendezvousparis.hermes.com', + 'Sec-Fetch-Site': 'same-origin', + 'sec-ch-ua': '"Google Chrome";v="{}", "Chromium";v="{}", "Not.A/Brand";v="24"'.format(_chrome_version, + _chrome_version), + 'sec-ch-ua-platform': '"Android"', + 'sec-ch-ua-model': '"{}"'.format(_model), + 'Sec-Fetch-Dest': 'document', + 'Accept-Language': 'fr-FR,fr;q=0.6'} + return headers + + +if __name__ == '__main__': + # _ua_to_test = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36" + # _ua_to_test = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36" + # print(get_chrome_version_from_ua(_ua_to_test)) # Output: 129.0.0.0 + _received_dict = {"glvd": "Google Inc. (ARM)", "glrd": "ANGLE (ARM, Mali-G57 MC2, OpenGL ES 3.2)", "hc": 8, + "br_oh": 745, "br_ow": 393, "br_h": 745, "br_w": 393, "rs_h": 876, "rs_w": 393, "rs_cd": 24, + "ars_h": 876, "ars_w": 393, "plg": 0, "eva": 33, "vnd": "Google Inc.", "plu": [], + "ua": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36", + "dvm": 4, "acw": "probably", "pr": 2.75, "ts_mtp": 5, "tz": -60, "emd": "k:ai,vi,ao", + "cookiesStr": "datadome=cVhteVJiGoU3N1T61hX4dF5vNG4~p5JpkbsKLH674kzQiuT1G5sDohLD33nFx2QFqlPiJfY~yfsRNIpqM8xZ1JfXTgT7p8v5QfkmmOl~BIhLOVCBHe_6Al6CM2QsSq7g;__cf_bm=J9g80A6wlNO03BLqyhiuf5ZRkx_8Ig7QIOLhHoEh1i8-1734010016-1.0.1.1-fMIRmq.K8K093kcSzJdFlcIEoCH9XwWhDlsXFoDmvNnerYvpyG0eC9vIdqj5xACF28YYAYGXwuag5f33JoDiBg;", + "model": "22041219PG", "serial": "fmiz5pa6rsx4u4ts"} + pprint(generate_headers_from_request_message(_received_dict, _received_dict["cookiesStr"])) diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py index 1006c4d..153d68e 100644 --- a/workers/link_validator_with_provided_list.py +++ b/workers/link_validator_with_provided_list.py @@ -13,11 +13,10 @@ from db.mongo_manager import MONGO_STORE_MANAGER from models.LinkPojo import LinkPojo from models.result_pojo import RequestResult from proxy_manager.proxy_manager import ProxyManager -from queue_message.CookiesPublisher import CookiesPublisher, REQUEST_DATA_QUEUE_TEST, TEST_QUEUE, SHARED_OBJECT, \ - MORNING_DATA_CACHE_BAK, MORNING_DATA_CACHE, MORNING_DATA_CACHE_2 +from queue_message.CookiesPublisher import CookiesPublisher, MORNING_DATA_CACHE from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials from utils.AppLogging import init_logger -from workers.proxies_constants import PROXY_LIST_FR +from utils.user_agent_helper import generate_headers_from_request_message DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées" INVALID = "Depuis plus de 130 ans," @@ -26,13 +25,21 @@ init_logger() logger = logging.getLogger() +def filter_link_pojo_list_with_serial(_received_dict, link_to_validate_list): + _serial = _received_dict["serial"] + _model = _received_dict["model"] + _to_return = filter(lambda link_pojo: link_pojo.serial == _serial, link_to_validate_list) + return list(_to_return) + + class LinkValidatorWithProvidedList(threading.Thread): - def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list, + def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list, _contact_serial_list, queue_to_listen=REQUEST_DATA_QUEUE, ip_country="FR", limit=40): super().__init__() + self.contact_serial_list = _contact_serial_list self.link_to_validate_list = link_list self.cookie = SimpleCookie() self.cookiesPublisher = cookiesPublisher @@ -57,25 +64,9 @@ class LinkValidatorWithProvidedList(threading.Thread): if _received_dict is not None: _ua = _received_dict['ua'] _model = _received_dict['model'] - _ua = "Mozilla/5.0 (Linux; Android 9; {}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.58 Mobile Safari/537.36".format( - _model) logger.info("model: %s", _model) self.cookie.load(self.cookie_str) - headers = { - 'User-Agent': _ua, - 'Accept': '*/*', - 'Accept-Encoding': 'gzip, deflate, br', - 'Cache-Control': 'max-age=0', - 'Referer': linkPojo.url, - 'Cookie': self.cookie_str, - 'Sec-Fetch-Mode': 'navigate', - 'Host': 'rendezvousparis.hermes.com', - 'Sec-Fetch-Site': 'same-origin', - 'sec-ch-ua': '"Brave";v="125", "Chromium";v="125", "Not.A/Brand";v="24"', - 'sec-ch-ua-platform': '"Android"', - 'sec-ch-ua-model': '""', - 'Sec-Fetch-Dest': 'document', - 'Accept-Language': 'fr-FR,fr;q=0.6'} + headers = generate_headers_from_request_message(_received_dict, self.cookie_str) print("received cookie is " + str(self.cookie_str)) print("send request for link: " + linkPojo.url) try: @@ -110,7 +101,6 @@ class LinkValidatorWithProvidedList(threading.Thread): self.cookie_str = new_coolies_str return RequestResult.SUCCESS elif INVALID in _content: - MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_invalid=True, ua=_ua) # set new cookies _cookies_to_set = response.headers['set-cookie'] @@ -158,14 +148,15 @@ class LinkValidatorWithProvidedList(threading.Thread): _received_dict = json.loads(_received_object) _received_cookies = _received_dict["cookiesStr"] self.cookie_str = _received_cookies - random.shuffle(self.link_to_validate_list) - if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit: - print("{}:links number is {}".format(threading.currentThread().name, len(self.link_to_validate_list))) + _links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list) + random.shuffle(_links_to_validate) + if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit: + print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate))) can_continue = None - for link_to_validate in self.link_to_validate_list: + for link_to_validate in _links_to_validate: print(link_to_validate) self.proxy_to_use = random.choice( - self.proxy_manager.get_link_validate_proxy(self.link_to_validate_list)) + self.proxy_manager.get_link_validate_proxy(_links_to_validate)) print("proxy to use is {}".format(self.proxy_to_use)) can_continue = self.send_request(link_to_validate, _received_dict) # remove the tested link from link list @@ -186,9 +177,11 @@ class LinkValidatorWithProvidedList(threading.Thread): print("will ack") ch.basic_ack(delivery_tag=method.delivery_tag) else: - print("empty list, no need to ack") - time.sleep(5) - ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + print("empty list, will republish message") + time.sleep(0) + print("body in json:{}".format(json.dumps(_received_dict))) + self.cookiesPublisher.publish_body(json.dumps(_received_dict)) + ch.basic_ack(delivery_tag=method.delivery_tag) self.channel.stop_consuming() def filter_with_ip_country(self): @@ -199,9 +192,9 @@ class LinkValidatorWithProvidedList(threading.Thread): self.link_to_validate_list = _link_list_to_click -def validate_links(cookiesPublisher, queue_name: str, link_list: list): +def validate_links(cookiesPublisher, queue_name: str, link_list: list, _contact_serial_list): receiver = LinkValidatorWithProvidedList(cookiesPublisher=cookiesPublisher, link_list=link_list, - queue_to_listen=queue_name, ip_country="FR", limit=0) + queue_to_listen=queue_name, _contact_serial_list= _contact_serial_list, ip_country="FR", limit=0) print("{} set_up_connection".format(threading.currentThread().name)) receiver.set_up_connection() receiver.listen_to_queue(receiver.on_message) @@ -209,26 +202,28 @@ def validate_links(cookiesPublisher, queue_name: str, link_list: list): # default_segment_number 并发数,决定速度 # divided = 4,越小,一次处理得越多 -def validate_all_links(): +def validate_all_links(_contact_serial_list): print("will get all links") all_link_list = MONGO_STORE_MANAGER.get_links_to_validate() # get the first 50 links if len(all_link_list) == 0: return - divided = 3 - default_segment_number = 20 - _first_25_percent_links = all_link_list[0:(int(len(all_link_list) / divided))] + # divided = 1 + # default_segment_number = 20 + # _first_25_percent_links = all_link_list[0:(int(len(all_link_list) / divided))] + _first_25_percent_links = all_link_list _queue_name = MORNING_DATA_CACHE - # _queue_name = MORNING_DATA_CACHE_BAK - if len(all_link_list) > divided * default_segment_number: - _segment_number = default_segment_number - else: - _first_25_percent_links = all_link_list - if len(_first_25_percent_links) > divided: - _segment_number = int(len(_first_25_percent_links) / divided) - else: - _segment_number = 1 + # # _queue_name = MORNING_DATA_CACHE_BAK + # if len(all_link_list) > divided * default_segment_number: + # _segment_number = default_segment_number + # else: + # _first_25_percent_links = all_link_list + # if len(_first_25_percent_links) > divided: + # _segment_number = int(len(_first_25_percent_links) / divided) + # else: + # _segment_number = 1 _thread_list = [] + _segment_number =1 for i in range(0, _segment_number): logger.info("{}:{} links to validate".format(threading.currentThread().name, len(_first_25_percent_links))) logger.info("segment is {}".format(i)) @@ -236,7 +231,8 @@ def validate_all_links(): _cookiesPublisher.set_up_connection() _step = int(len(_first_25_percent_links) / _segment_number) _sublist = _first_25_percent_links[i * _step:_step * (i + 1)] - _thread1 = threading.Thread(target=validate_links, args=(_cookiesPublisher, _queue_name, _sublist)) + _thread1 = threading.Thread(target=validate_links, + args=(_cookiesPublisher, _queue_name, _sublist, _contact_serial_list)) _thread_list.append(_thread1) _thread1.start() for _thread in _thread_list: @@ -245,10 +241,12 @@ def validate_all_links(): if __name__ == '__main__': # generate test data + # contact_serial_map_list = MONGO_STORE_MANAGER.get_all_contact_serial_list() while True: print("call validate_all_links()") - validate_all_links() - delay = random.randint(10, 30) + validate_all_links([]) + # delay = random.randint(10, 30) + delay = random.randint(1, 10) current_time = datetime.now() current_hour = current_time.hour print("Current hour ", current_time.hour)