From 4c05d9c87eb2750bcca6e1a080cd9684fcc85d3b Mon Sep 17 00:00:00 2001 From: panlei Date: Sat, 29 Jun 2024 11:47:28 +0200 Subject: [PATCH 1/3] re-queue the safe blocked cookies --- models/result_pojo.py | 1 + queue_message/appointmentrequestsender.py | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/models/result_pojo.py b/models/result_pojo.py index cc97187..c3b6949 100644 --- a/models/result_pojo.py +++ b/models/result_pojo.py @@ -3,6 +3,7 @@ from enum import Enum class RequestResult(Enum): BLOCKED = "BLOCKED" + CSRF_BLOCKED = "CSRF_BLOCKED" PROXY_ERROR = "PROXY_ERROR" SUCCESS = "SUCCESS" COOKIES_ERROR = "COOKIES_ERROR" diff --git a/queue_message/appointmentrequestsender.py b/queue_message/appointmentrequestsender.py index 16cb6a5..768850b 100644 --- a/queue_message/appointmentrequestsender.py +++ b/queue_message/appointmentrequestsender.py @@ -117,6 +117,7 @@ class AppointmentRequestSender(threading.Thread): captchaResultGetter = CaptchaResultGetter() self.logger.info("contact number is {}".format(len(self.contact_list))) # self.contact_list = filter_contacts(self.contact_list) + can_continue = None for con in self.contact_list: # _proxy_to_use = self.proxy_manager.get_proxy_for_appointment_request() # print(_proxy_to_use) @@ -133,6 +134,7 @@ class AppointmentRequestSender(threading.Thread): else: self.logger.info("csrf is {}".format(csrf_result)) if csrf_result == RequestResult.BLOCKED: + can_continue = RequestResult.CSRF_BLOCKED break _new_cookies = captchaResultGetter.get_valid_ch_cookie(sender.proxy_to_use, js_data, old_valid_cookie=_received_cookies) @@ -200,7 +202,12 @@ class AppointmentRequestSender(threading.Thread): self.valid_csrf = None time.sleep(random.randint(1, 2)) self.logger.info("will ack method.delivery_tag: " + str(method.delivery_tag)) - ch.basic_ack(delivery_tag=method.delivery_tag) + if can_continue is not None and can_continue == RequestResult.CSRF_BLOCKED: + self.logger.info("csrf blocked, will republish cookie") + self.cookiesPublisher.publish_body(_received_object) + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + ch.basic_ack(delivery_tag=method.delivery_tag) else: self.retrieve_invalidate_urls() self.logger.info("empty list") From 801df9b06adc8c92e4c54f180dcac11964db3b0e Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Wed, 3 Jul 2024 16:38:03 +0200 Subject: [PATCH 2/3] share cookies with click link programme when blocked. --- queue_message/appointmentrequestsender.py | 9 +++++++++ request_sender_test.py | 11 +++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/queue_message/appointmentrequestsender.py b/queue_message/appointmentrequestsender.py index 768850b..ce931ad 100644 --- a/queue_message/appointmentrequestsender.py +++ b/queue_message/appointmentrequestsender.py @@ -64,12 +64,14 @@ def is_open(): class AppointmentRequestSender(threading.Thread): def __init__(self, sub_contact_list: list, logger, cookiesPublisher: CookiesPublisher, + bakeUpCookiesPublisher: CookiesPublisher, queue_name=REQUEST_DATA_QUEUE): super().__init__() self.connection = None self.logger = logger self.already_tried_contact_list = [] self.cookiesPublisher = cookiesPublisher + self.bakeUpCookiesPublisher = bakeUpCookiesPublisher self.channel = None self.valid_csrf = None self.list_to_retrieve_mails = sub_contact_list @@ -203,8 +205,15 @@ class AppointmentRequestSender(threading.Thread): time.sleep(random.randint(1, 2)) self.logger.info("will ack method.delivery_tag: " + str(method.delivery_tag)) if can_continue is not None and can_continue == RequestResult.CSRF_BLOCKED: + # 如果在发送请求时出现csrf被拦截的情况,那么就需要重新发布cookie以目前的队列中,因为这个cookie可能重新利用 self.logger.info("csrf blocked, will republish cookie") self.cookiesPublisher.publish_body(_received_object) + self.logger.info("csrf blocked, will wait 60 seconds") + time.sleep(60) + ch.basic_ack(delivery_tag=method.delivery_tag) + elif can_continue is not None and can_continue == RequestResult.BLOCKED: + self.logger.info("这个cookies可以给点链接用") + self.bakeUpCookiesPublisher.publish_body(_received_object) ch.basic_ack(delivery_tag=method.delivery_tag) else: ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/request_sender_test.py b/request_sender_test.py index e3e72a5..2fe77a2 100644 --- a/request_sender_test.py +++ b/request_sender_test.py @@ -7,7 +7,7 @@ from db.mongo_manager import MONGO_STORE_MANAGER from excel_reader import read_contacts from models.contact_pojo import ContactPojo from queue_message.CookiesPublisher import CookiesPublisher, SHARED_OBJECT, TEST_QUEUE, MORNING_DATA_CACHE, \ - MORNING_DATA_CACHE_2 + MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK from queue_message.appointmentrequestsender import AppointmentRequestSender from utiles import is_time_between from utils.AppLogging import init_logger @@ -60,9 +60,12 @@ def send_appointment_request(message_queue_name, _contact_list): logger.info(_contact) _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) _cookiesPublisher.set_up_connection() + _backUp_cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE_BAK) + _backUp_cookiesPublisher.set_up_connection() receiver = AppointmentRequestSender(sub_contact_list=_contact_list, queue_name=message_queue_name, - cookiesPublisher=_cookiesPublisher, logger=logger) + cookiesPublisher=_cookiesPublisher, + bakeUpCookiesPublisher=_backUp_cookiesPublisher, logger=logger) print("count is " + str(count)) receiver.run() @@ -99,6 +102,6 @@ if __name__ == '__main__': # '~/Desktop/contact_list_2024-05-21.xlsx', # '~/Desktop/15_05_to_test.xlsx'] # file_list = ['~/Desktop/15_05_to_test.xlsx', '~/Desktop/16_05_to_test.xlsx'] - file_list = ['~/Desktop/contact_list_2024-06-06.xlsx'] + file_list = ['~/Desktop/contact_list_2024-06-28.xlsx'] send_request_for_file_list(file_list=file_list, thread_number=10, - data_queue_name=MORNING_DATA_CACHE_2) + data_queue_name=MORNING_DATA_CACHE) From 901c5da84d244e92062596b87e2cb764cb961df9 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Wed, 3 Jul 2024 18:23:40 +0200 Subject: [PATCH 3/3] reduce the wait time to 10s --- queue_message/appointmentrequestsender.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_message/appointmentrequestsender.py b/queue_message/appointmentrequestsender.py index ce931ad..df45dfc 100644 --- a/queue_message/appointmentrequestsender.py +++ b/queue_message/appointmentrequestsender.py @@ -220,7 +220,7 @@ class AppointmentRequestSender(threading.Thread): else: self.retrieve_invalidate_urls() self.logger.info("empty list") - time.sleep(30) + time.sleep(10) self.logger.info("will basic_reject method.delivery_tag: " + str(method.delivery_tag)) ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) else: