diff --git a/proxy_manager/proxy_manager.py b/proxy_manager/proxy_manager.py index e46cbec..6c2473c 100644 --- a/proxy_manager/proxy_manager.py +++ b/proxy_manager/proxy_manager.py @@ -35,8 +35,9 @@ FR_DATA_IMPULSE_RES = { } # 八分之一用data_impulse MOBILE_PROXY_LIST = [FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, - FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, - FR_PROXY_DATA_IMPULSE_STICKY] + FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY + # FR_PROXY_DATA_IMPULSE_STICKY + ] class ProxyManager: diff --git a/queue_message/appointmentrequestsender.py b/queue_message/appointmentrequestsender.py index 1d962d6..baf1972 100644 --- a/queue_message/appointmentrequestsender.py +++ b/queue_message/appointmentrequestsender.py @@ -64,18 +64,16 @@ def is_open(): class AppointmentRequestSender(threading.Thread): def __init__(self, sub_contact_list: list, logger, cookiesPublisher: CookiesPublisher, - just_send=False, queue_name=REQUEST_DATA_QUEUE): super().__init__() self.connection = None - self.just_send = just_send self.logger = logger self.already_tried_contact_list = [] self.cookiesPublisher = cookiesPublisher self.channel = None self.valid_csrf = None self.list_to_retrieve_mails = sub_contact_list - self.initial_contact_list = sub_contact_list + self.initial_contact_list = sub_contact_list.copy() self.contact_list = sub_contact_list self.queue_name = queue_name self.proxy_manager = ProxyManager(logger) @@ -112,10 +110,7 @@ class AppointmentRequestSender(threading.Thread): rs_w=_received_dict['rs_w'], rs_cd=_received_dict['rs_cd']) _received_cookies = _received_dict["cookiesStr"] # remove already sent contacts - if self.just_send: - self.contact_list = filter_contacts(self.contact_list, self.already_tried_contact_list) - else: - self.contact_list = filter_contacts(self.contact_list) + self.contact_list = filter_contacts(self.contact_list, self.already_tried_contact_list) # remove already booked contacts random.shuffle(self.contact_list) if len(self.contact_list) > 0 and is_open(): @@ -222,6 +217,7 @@ class AppointmentRequestSender(threading.Thread): self.channel.start_consuming() def retrieve_invalidate_urls(self): + # 如果没有已读邮件,而且需要读邮件的联系人表不为空,就读取未读邮件 if not self.already_read_emails and len(self.list_to_retrieve_mails) > 0: self.logger.info("will retrieve validate urls") time.sleep(30) @@ -238,9 +234,13 @@ class AppointmentRequestSender(threading.Thread): else: self.logger.info("already read emails, is there any contacts to use") self.logger.info("reset already_tried_contact_list") + # 重置已尝试的联系人 self.already_tried_contact_list = [] self.contact_list = filter_contacts(self.initial_contact_list, self.already_tried_contact_list) self.logger.info("contact_list size is " + str(len(self.contact_list))) if len(self.contact_list) > 0: self.logger.info("set already_read_emails to False") self.already_read_emails = False + else: + self.logger.info("already read emails, no contact to use -> stop") + self.channel.stop_consuming() diff --git a/request_sender_test.py b/request_sender_test.py index dc61d44..dae1e23 100644 --- a/request_sender_test.py +++ b/request_sender_test.py @@ -61,30 +61,39 @@ def send_appointment_request(message_queue_name, _contact_list): _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) _cookiesPublisher.set_up_connection() receiver = AppointmentRequestSender(sub_contact_list=_contact_list, - queue_name=message_queue_name, just_send=True, + queue_name=message_queue_name, cookiesPublisher=_cookiesPublisher, logger=logger) print("count is " + str(count)) receiver.run() -def start_send_requests(thread_number, file_path, data_queue_name=MORNING_DATA_CACHE): +def start_send_requests(thread_number, contact_list, data_queue_name=MORNING_DATA_CACHE): print("start send requests") - contacts_file_path = file_path - _contact_list = read_contacts(contacts_file_path) - _contact_list_to_book = filter_contacts(_contact_list) + _contact_list_to_book = filter_contacts(contact_list) _segment_number = thread_number logger.info("{} contacts to book".format(len(_contact_list_to_book))) - last_thread = None + # last_thread = None + thread_list = [] for i in range(0, _segment_number): logger.info("segment is {}".format(i)) _step = int(len(_contact_list_to_book) / _segment_number) _sublist = _contact_list_to_book[i * _step:_step * (i + 1)] _thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist)) - last_thread = _thread1 + thread_list.append(_thread1) _thread1.start() - last_thread.join() + for _thread in thread_list: + _thread.join() + + +def send_request_for_file_list(_file_list: list[str], thread_number: int = 20, data_queue_name=MORNING_DATA_CACHE): + for _file_path in file_list: + logger.info("send request for file: " + _file_path) + _contact_list = read_contacts(_file_path) + start_send_requests(thread_number=thread_number, contact_list=_contact_list, + data_queue_name=data_queue_name) if __name__ == '__main__': - start_send_requests(thread_number=30, file_path='~/Desktop/contact_list_2024-05-14.xlsx', - data_queue_name=MORNING_DATA_CACHE_2) + file_list = ['~/Desktop/16_05_to_test.xlsx'] + send_request_for_file_list(_file_list=file_list, thread_number=20, + data_queue_name=MORNING_DATA_CACHE_2)