From 62a7cc020db52adce42c8b27e6d9ead53e94a7c6 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Mon, 30 Jun 2025 17:49:52 +0200 Subject: [PATCH 1/6] pb with data in the queue --- workers/link_validator_with_provided_list.py | 72 +++++++++++--------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py index 684bb18..e11b03b 100644 --- a/workers/link_validator_with_provided_list.py +++ b/workers/link_validator_with_provided_list.py @@ -149,44 +149,48 @@ class LinkValidatorWithProvidedList(threading.Thread): _message_in_queue_count = self.cookiesPublisher.message_count() print("message count in queue is {}".format(_message_in_queue_count)) _received_object = body.decode("UTF-8") - _received_dict = json.loads(_received_object) - _received_cookies = _received_dict["cookiesStr"] - self.cookie_str = _received_cookies - _links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list) - random.shuffle(_links_to_validate) - if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit: - print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate))) - can_continue = None - for link_to_validate in _links_to_validate: - print(link_to_validate) - self.proxy_to_use = random.choice( - self.proxy_manager.get_link_validate_proxy(_links_to_validate)) - print("proxy to use is {}".format(self.proxy_to_use)) - can_continue = self.send_request(link_to_validate, _received_dict) - # remove the tested link from link list - self.link_to_validate_list.remove(link_to_validate) - if can_continue == RequestResult.BLOCKED: - print("cannot continue, blocked, then skip") - break - else: - if can_continue == RequestResult.BAD_GATEWAY: - time.sleep(30) + try: + _received_dict = json.loads(_received_object) + _received_cookies = _received_dict["cookiesStr"] + self.cookie_str = _received_cookies + _links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list) + random.shuffle(_links_to_validate) + if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit: + print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate))) + can_continue = None + for link_to_validate in _links_to_validate: + print(link_to_validate) + self.proxy_to_use = random.choice( + self.proxy_manager.get_link_validate_proxy(_links_to_validate)) + print("proxy to use is {}".format(self.proxy_to_use)) + can_continue = self.send_request(link_to_validate, _received_dict) + # remove the tested link from link list + self.link_to_validate_list.remove(link_to_validate) + if can_continue == RequestResult.BLOCKED: + print("cannot continue, blocked, then skip") break - time.sleep(random.randint(2, 5)) - print("can continue, continue") - if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: - print("will requeue the message") - ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + else: + if can_continue == RequestResult.BAD_GATEWAY: + time.sleep(30) + break + time.sleep(random.randint(2, 5)) + print("can continue, continue") + if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: + print("will requeue the message") + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + else: + print("will ack") + ch.basic_ack(delivery_tag=method.delivery_tag) else: - print("will ack") + print("empty list, will republish message") + time.sleep(0) + print("body in json:{}".format(json.dumps(_received_dict))) + self.cookiesPublisher.publish_body(json.dumps(_received_dict)) ch.basic_ack(delivery_tag=method.delivery_tag) - else: - print("empty list, will republish message") - time.sleep(0) - print("body in json:{}".format(json.dumps(_received_dict))) - self.cookiesPublisher.publish_body(json.dumps(_received_dict)) + self.channel.stop_consuming() + except: + print("not json format will ack") ch.basic_ack(delivery_tag=method.delivery_tag) - self.channel.stop_consuming() def filter_with_ip_country(self): _link_list_to_click = [] From d35c667b5fcb9e37183291c62c5f79dbe40a96f0 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Tue, 1 Jul 2025 00:28:21 +0200 Subject: [PATCH 2/6] update oxy sticky credentiels --- proxy_manager/proxy_manager.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/proxy_manager/proxy_manager.py b/proxy_manager/proxy_manager.py index 3391f75..cf4058b 100644 --- a/proxy_manager/proxy_manager.py +++ b/proxy_manager/proxy_manager.py @@ -1,5 +1,8 @@ +import logging import random +import requests + FR_ASOCKS_MOBILE_PROXY = { 'http': 'http://11797317-mobile-country-FR:nv958134x@190.2.151.110:14046', 'https': 'http://11797317-mobile-country-FR:nv958134x@190.2.151.110:14046', @@ -25,8 +28,8 @@ FR_PROXY_ASOCK_RES_2 = { 'https': 'http://zd6fbrujot-res-country-FR-hold-query:8k8avNlnLHQaMsWg@217.23.6.161:9999' } FR_PROXY_MOB_OXY_STICKY = { - 'http': 'http://customer-rendezvousmob-cc-FR:Rdv202220212023@fr-pr.oxylabs.io:{}', - 'https': 'http://customer-rendezvousmob-cc-FR:Rdv202220212023@fr-pr.oxylabs.io:{}' + 'http': 'http://customer-rendezvousmob-cc-fr-sessid-0{}-sesstime-2:Rdv+202220212023@pr.oxylabs.io:7777', + 'https': 'http://customer-rendezvousmob-cc-fr-sessid-0{}-sesstime-2:Rdv+202220212023@pr.oxylabs.io:7777' } FR_PROXY_DATA_IMPULSE_STICKY = { @@ -59,12 +62,11 @@ FR_MOBILE_ANY_IP_ROTATING = { # FR_PROXY_DATA_IMPULSE_STICKY] +# MOBILE_PROXY_LIST = [FR_MOBILE_ANY_IP_STICKY, FR_PROXY_MOB_OXY_STICKY] +# MOBILE_PROXY_LIST = [FR_PROXY_MOB_OXY_STICKY] MOBILE_PROXY_LIST = [FR_MOBILE_ANY_IP_STICKY] -# MOBILE_PROXY_LIST = [FR_MOBILE_ANY_IP_STICKY] - - class ProxyManager: def __init__(self, logger): @@ -72,7 +74,8 @@ class ProxyManager: def get_link_validate_proxy(self, links_to_validate: list) -> list: # return [FR_PROXY_RES_PARIS_OXY] - return [FR_MOBILE_ANY_IP_ROTATING] + # return [FR_MOBILE_ANY_IP_ROTATING] + return [FR_PROXY_RES_OXY] # if len(links_to_validate) > 15: # return [FR_PROXY_RES_OXY, FR_PROXY_MOB_OXY, FR_PROXY_ASOCK_RES_2, FR_DATA_IMPULSE_RES] # # return [FR_PROXY_RES_OXY, FR_PROXY_ASOCK_RES_2, FR_DATA_IMPULSE_RES, FR_ASOCKS_MOBILE_PROXY] @@ -88,7 +91,7 @@ class ProxyManager: _chosen_proxy = random.choice(MOBILE_PROXY_LIST) if "oxylabs" in _chosen_proxy["http"]: self.logger.info("use oxylabs proxy") - _port = random.randint(40001, 49999) + _port = random.randint(900000000, 995869818) elif "anyip" in _chosen_proxy["http"]: self.logger.info("use anyip proxy") _port = random.randint(40001, 49999) @@ -100,3 +103,14 @@ class ProxyManager: _proxy_to_use["http"] = _chosen_proxy["http"].format(_port) _proxy_to_use["https"] = _chosen_proxy["https"].format(_port) return _proxy_to_use + + +if __name__ == '__main__': + _logger = logging.getLogger() + _proxy = ProxyManager(logger=_logger) + _proxise = _proxy.get_proxy_for_appointment_request() + print(_proxise) + response = requests.get( + "https://ip.oxylabs.io/location", + proxies=_proxise) + print(response.content) From 1a9a88c49a9b2bd3d65c4d32f465c28118142356 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Tue, 1 Jul 2025 15:01:46 +0200 Subject: [PATCH 3/6] save phone model --- models/ReserveResultPojo.py | 4 ++-- workers/sender.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/models/ReserveResultPojo.py b/models/ReserveResultPojo.py index fdb0361..63e5588 100755 --- a/models/ReserveResultPojo.py +++ b/models/ReserveResultPojo.py @@ -29,6 +29,7 @@ class ReserveResultPojo: slot_position = None sim_position = None ccid: str = "" + model: str = "" source_from: str = socket.gethostname() store_type = 0 url_validated = None @@ -122,8 +123,7 @@ class ReserveResultPojo: u'email': self.email, u'passport': self.passport, u'url': self.url, - # u'sim_position': self.sim_position, - # u'slot_position': self.slot_position, + u'model': self.model, u'source_from': self.source_from, u'hostName': self.source_from, u'created_at': self.created_at, diff --git a/workers/sender.py b/workers/sender.py index f5b0236..3fbb433 100644 --- a/workers/sender.py +++ b/workers/sender.py @@ -38,7 +38,8 @@ class Sender: self.proxy_to_use = proxy_to_use self.cookie.load(self.cookie_str) - def publish_message_to_queue(self, contact: ContactPojo, status: PublishType, url: str, store_type: str): + def publish_message_to_queue(self, contact: ContactPojo, status: PublishType, url: str, store_type: str, + model: str = ""): # create the message if url == "https://rendezvousparis.hermes.com/client/welcome": return @@ -59,11 +60,12 @@ class Sender: result.proxy = "data_impulse" result.id = id result.store_type = store_type + result.model = model result.created_at = time.strftime("%H:%M:%S", time.localtime()) collection_name = str(datetime.date.today()) MONGO_STORE_MANAGER.insert_reserve_result(collection_name=collection_name, reserve=result) - def apply_redirect(self, response, old_headers, contact, js_data, selected_store): + def apply_redirect(self, response, old_headers, contact, js_data, selected_store, model=""): # /client/register/5XD2E2 _res_headers = response.headers _location = _res_headers['location'] @@ -87,7 +89,7 @@ class Sender: contact.current_ip = get_address_ip(proxy_to_use=self.proxy_to_use) _appointment_url = _redirect_url self.publish_message_to_queue(contact, status=PublishType.SUCCESS, url=_appointment_url, - store_type=selected_store) + store_type=selected_store, model=model) self.cookie.load(_cookies_to_set) new_cookies = {k: v.value for k, v in self.cookie.items()} new_coolies_str = "" @@ -175,7 +177,7 @@ class Sender: # add to mongodb self.logger.info(response.text) self.apply_redirect(response=response, old_headers=headers, contact=contact, js_data=js_data, - selected_store=_selected_store) + selected_store=_selected_store, model=model) # self.logger.info("{}:{}".format(contact.mail, response.url)) # contact.ua = js_data.ua # contact.current_ip = get_address_ip(proxy_to_use=proxy_to_use) From 5cda2f5acb5e576c00c1c75cc6264cbbc9bf6ba4 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Wed, 2 Jul 2025 15:46:29 +0200 Subject: [PATCH 4/6] wait 20s when cookies is blocked while validating links --- workers/link_validator_with_provided_list.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py index e11b03b..e4efdb6 100644 --- a/workers/link_validator_with_provided_list.py +++ b/workers/link_validator_with_provided_list.py @@ -180,6 +180,8 @@ class LinkValidatorWithProvidedList(threading.Thread): ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) else: print("will ack") + print("will wait for 40s") + time.sleep(20) ch.basic_ack(delivery_tag=method.delivery_tag) else: print("empty list, will republish message") @@ -217,7 +219,7 @@ def validate_all_links(_contact_serial_list): link_to_validated = [] for _link in all_link_list: # print("serial is "+_link.serial) - print("email is "+_link.email) + print("email is " + _link.email) if _link.serial == "requests": link_to_validated.append(_link) # get the first 50 links @@ -227,8 +229,8 @@ def validate_all_links(_contact_serial_list): # default_segment_number = 20 _first_25_percent_links = link_to_validated[0:(int(len(all_link_list) / divided))] _first_25_percent_links = all_link_list - # _queue_name = MORNING_DATA_CACHE - _queue_name = MORNING_DATA_CACHE_BAK + _queue_name = MORNING_DATA_CACHE + # _queue_name = MORNING_DATA_CACHE_BAK # if len(all_link_list) > divided * default_segment_number: # _segment_number = default_segment_number # else: @@ -238,8 +240,10 @@ def validate_all_links(_contact_serial_list): # else: # _segment_number = 1 _thread_list = [] - if len(_first_25_percent_links) >=10: + if len(_first_25_percent_links) >= 10: _segment_number = 10 + elif len(_first_25_percent_links) >= 5: + _segment_number = 5 else: _segment_number = 1 for i in range(0, _segment_number): From 991602afd7b9dec35eb2b80a2482bde029a16058 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Wed, 2 Jul 2025 17:26:34 +0200 Subject: [PATCH 5/6] wait 20s when cookies is blocked while validating links --- workers/link_validator_with_provided_list.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py index e4efdb6..e47a275 100644 --- a/workers/link_validator_with_provided_list.py +++ b/workers/link_validator_with_provided_list.py @@ -242,10 +242,8 @@ def validate_all_links(_contact_serial_list): _thread_list = [] if len(_first_25_percent_links) >= 10: _segment_number = 10 - elif len(_first_25_percent_links) >= 5: - _segment_number = 5 else: - _segment_number = 1 + _segment_number = len(_first_25_percent_links) for i in range(0, _segment_number): logger.info("{}:{} links to validate".format(threading.currentThread().name, len(_first_25_percent_links))) logger.info("segment is {}".format(i)) From 2dfb483161a141d997e5e77ff9b95efbf5ec0b86 Mon Sep 17 00:00:00 2001 From: PAN Lei Date: Fri, 4 Jul 2025 11:30:15 +0200 Subject: [PATCH 6/6] handle not valid json exception --- workers/link_validator_with_provided_list.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py index e11b03b..1943889 100644 --- a/workers/link_validator_with_provided_list.py +++ b/workers/link_validator_with_provided_list.py @@ -188,7 +188,8 @@ class LinkValidatorWithProvidedList(threading.Thread): self.cookiesPublisher.publish_body(json.dumps(_received_dict)) ch.basic_ack(delivery_tag=method.delivery_tag) self.channel.stop_consuming() - except: + except Exception as error: + print(error) print("not json format will ack") ch.basic_ack(delivery_tag=method.delivery_tag) @@ -217,7 +218,7 @@ def validate_all_links(_contact_serial_list): link_to_validated = [] for _link in all_link_list: # print("serial is "+_link.serial) - print("email is "+_link.email) + print("email is " + _link.email) if _link.serial == "requests": link_to_validated.append(_link) # get the first 50 links @@ -227,8 +228,8 @@ def validate_all_links(_contact_serial_list): # default_segment_number = 20 _first_25_percent_links = link_to_validated[0:(int(len(all_link_list) / divided))] _first_25_percent_links = all_link_list - # _queue_name = MORNING_DATA_CACHE - _queue_name = MORNING_DATA_CACHE_BAK + _queue_name = MORNING_DATA_CACHE + # _queue_name = MORNING_DATA_CACHE_BAK # if len(all_link_list) > divided * default_segment_number: # _segment_number = default_segment_number # else: @@ -238,7 +239,7 @@ def validate_all_links(_contact_serial_list): # else: # _segment_number = 1 _thread_list = [] - if len(_first_25_percent_links) >=10: + if len(_first_25_percent_links) >= 10: _segment_number = 10 else: _segment_number = 1