From 385bda45575a599b9bfcb424b0a1d3ea47395820 Mon Sep 17 00:00:00 2001 From: PAN Lei Date: Tue, 17 Feb 2026 08:56:57 +0100 Subject: [PATCH] request_sender.py optimization --- request_sender.py | 125 +++++++++++++++++++++++++---------------- request_sender_test.py | 107 ----------------------------------- 2 files changed, 77 insertions(+), 155 deletions(-) delete mode 100644 request_sender_test.py diff --git a/request_sender.py b/request_sender.py index d053351..f6e5859 100644 --- a/request_sender.py +++ b/request_sender.py @@ -1,54 +1,40 @@ -import datetime import logging +import random import sys from threading import Thread from db.mongo_manager import MONGO_STORE_MANAGER from excel_reader import read_contacts from models.contact_pojo import ContactPojo -from queue_message.CookiesPublisher import CookiesPublisher, SHARED_OBJECT, TEST_QUEUE, MORNING_DATA_CACHE_2, \ - MORNING_DATA_CACHE +from queue_message.CookiesPublisher import CookiesPublisher, SHARED_OBJECT, TEST_QUEUE, MORNING_DATA_CACHE, \ + MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK from queue_message.appointmentrequestsender import AppointmentRequestSender -from utiles import is_time_between from utils.AppLogging import init_logger -from workers.proxies_constants import MOBILE_PROXY_LIST_FR - -IPFIY = 'http://api.ipify.org' -NGROK_TEST = "https://bcc6-193-164-156-53.ngrok-free.app" - - -def is_already_sent(contact: ContactPojo) -> bool: - already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() - for required_contact in already_sent_contacts: - if contact.mail == required_contact.email: - return True - return False def filter_contacts(_contact_list: list) -> list: already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() _link_to_validate_list = MONGO_STORE_MANAGER.get_links_to_validate() + + # Optimization: Use sets for O(1) lookup complexity + sent_emails = {booked.email for booked in already_sent_contacts} + validate_emails = {link.email for link in _link_to_validate_list} + _contact_list_to_book = [] for contact in _contact_list: - _to_add = True - for booked in already_sent_contacts: - if contact.mail == booked.email: - _to_add = False + if contact.mail in sent_emails: + continue + # 如果已经收到链接了,就不要再请求 - for link_to_validate in _link_to_validate_list: - if contact.mail == link_to_validate.email: - logger.info("{}: link already received".format(contact.mail)) - _to_add = False - if _to_add: - _contact_list_to_book.append(contact) + if contact.mail in validate_emails: + logger.info("{}: link already received".format(contact.mail)) + continue + + _contact_list_to_book.append(contact) return _contact_list_to_book -def is_open(): - return is_time_between(datetime.time(10, 30), datetime.time(19, 00)) - - count = 0 init_logger() logger = logging.getLogger() @@ -56,36 +42,79 @@ logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) -def send_appointment_request(message_queue_name, _contact_list): +def send_appointment_request(message_queue_name, _contact_list, stop_at_hour=11, stop_at_mins=30): global count count = count + 1 for _contact in _contact_list: logger.info(_contact) _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) _cookiesPublisher.set_up_connection() - receiver = AppointmentRequestSender(sub_contact_list=_contact_list, queue_name=message_queue_name, - cookiesPublisher=_cookiesPublisher, logger=logger) + _backUp_cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE_BAK) + _backUp_cookiesPublisher.set_up_connection() + receiver = AppointmentRequestSender(sub_contact_list=_contact_list, + queue_name=message_queue_name, + cookiesPublisher=_cookiesPublisher, + bakeUpCookiesPublisher=_backUp_cookiesPublisher, logger=logger, + stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) print("count is " + str(count)) receiver.run() -def start_send_requests(): +def start_send_requests(thread_number, contact_list, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=14, + stop_at_mins=56): print("start send requests") - contacts_file_path = '~/Desktop/06_05_to_test.xlsx' - _contact_list = read_contacts(contacts_file_path)[:1] - _contact_list_to_book = filter_contacts(_contact_list) - _segment_number = 1 - logger.info("{} contacts to book".format(len(_contact_list_to_book))) - last_thread = None - for i in range(0, _segment_number): + _contact_list_to_book = filter_contacts(contact_list) + _segment_number = thread_number + total_contacts = len(_contact_list_to_book) + logger.info("{} contacts to book".format(total_contacts)) + + if total_contacts == 0: + return + + # Optimization: Better distribution of contacts among threads + thread_list = [] + chunk_size = total_contacts // _segment_number + remainder = total_contacts % _segment_number + + start_index = 0 + for i in range(_segment_number): + # If we have more threads than contacts, some threads will get empty lists, which is fine + if start_index >= total_contacts: + break + logger.info("segment is {}".format(i)) - _step = int(len(_contact_list_to_book) / _segment_number) - _sublist = _contact_list_to_book[i * _step:_step * (i + 1)] - _thread1 = Thread(target=send_appointment_request, args=(MORNING_DATA_CACHE, _sublist)) - last_thread = _thread1 - _thread1.start() - last_thread.join() + + # Distribute remainder to the first few threads + current_chunk_size = chunk_size + (1 if i < remainder else 0) + end_index = start_index + current_chunk_size + + _sublist = _contact_list_to_book[start_index:end_index] + start_index = end_index + + if _sublist: + _thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist, stop_at_hour, stop_at_mins)) + thread_list.append(_thread1) + _thread1.start() + + for _thread in thread_list: + _thread.join() + + +def send_request_for_file_list(file_list: list, thread_number: int = 20, data_queue_name=MORNING_DATA_CACHE, + stop_at_hour=11, stop_at_mins=30): + logger.info("stop_at_hour is " + str(stop_at_hour) + " stop_at_mins is " + str(stop_at_mins)) + for _file_path in file_list: + logger.info("send request for file: " + _file_path) + _contact_list = read_contacts(_file_path) + random.shuffle(_contact_list) + start_send_requests(thread_number=thread_number, contact_list=_contact_list, + data_queue_name=data_queue_name, stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) if __name__ == '__main__': - start_send_requests() + # file_list = ['~/Desktop/contact_list_2024-05-23.xlsx', + # '~/Desktop/contact_list_2024-05-21.xlsx', + # file_list = ['~/Desktop/contact_list_2025-10-30.xlsx'] + file_list = ['~/Desktop/contact_list_2025-11-28.xlsx'] + send_request_for_file_list(file_list=file_list, thread_number=10, + data_queue_name=MORNING_DATA_CACHE_2, stop_at_hour=19, stop_at_mins=50) \ No newline at end of file diff --git a/request_sender_test.py b/request_sender_test.py deleted file mode 100644 index 42a7690..0000000 --- a/request_sender_test.py +++ /dev/null @@ -1,107 +0,0 @@ -import datetime -import logging -import random -import sys -from threading import Thread - -from db.mongo_manager import MONGO_STORE_MANAGER -from excel_reader import read_contacts -from models.contact_pojo import ContactPojo -from queue_message.CookiesPublisher import CookiesPublisher, SHARED_OBJECT, TEST_QUEUE, MORNING_DATA_CACHE, \ - MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK -from queue_message.appointmentrequestsender import AppointmentRequestSender -from utils.AppLogging import init_logger - - -def is_already_sent(contact: ContactPojo) -> bool: - already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() - for required_contact in already_sent_contacts: - if contact.mail == required_contact.email: - return True - return False - - -def filter_contacts(_contact_list: list) -> list: - already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() - _link_to_validate_list = MONGO_STORE_MANAGER.get_links_to_validate() - _contact_list_to_book = [] - for contact in _contact_list: - _to_add = True - for booked in already_sent_contacts: - if contact.mail == booked.email: - _to_add = False - # 如果已经收到链接了,就不要再请求 - for link_to_validate in _link_to_validate_list: - if contact.mail == link_to_validate.email: - logger.info("{}: link already received".format(contact.mail)) - _to_add = False - if _to_add: - _contact_list_to_book.append(contact) - - return _contact_list_to_book - - -count = 0 -init_logger() -logger = logging.getLogger() - -logger.addHandler(logging.StreamHandler(stream=sys.stdout)) - - -def send_appointment_request(message_queue_name, _contact_list, stop_at_hour=11, stop_at_mins=30): - global count - count = count + 1 - for _contact in _contact_list: - logger.info(_contact) - _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) - _cookiesPublisher.set_up_connection() - _backUp_cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE_BAK) - _backUp_cookiesPublisher.set_up_connection() - receiver = AppointmentRequestSender(sub_contact_list=_contact_list, - queue_name=message_queue_name, - cookiesPublisher=_cookiesPublisher, - bakeUpCookiesPublisher=_backUp_cookiesPublisher, logger=logger, - stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) - print("count is " + str(count)) - receiver.run() - - -def start_send_requests(thread_number, contact_list, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=14, - stop_at_mins=56): - print("start send requests") - _contact_list_to_book = filter_contacts(contact_list) - _segment_number = thread_number - logger.info("{} contacts to book".format(len(_contact_list_to_book))) - # last_thread = None - thread_list = [] - for i in range(0, _segment_number): - logger.info("segment is {}".format(i)) - _step = int(len(_contact_list_to_book) / _segment_number) - _sublist = _contact_list_to_book[i * _step:_step * (i + 1)] - _thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist, stop_at_hour, stop_at_mins)) - thread_list.append(_thread1) - _thread1.start() - for _thread in thread_list: - _thread.join() - - -def send_request_for_file_list(file_list: list, thread_number: int = 20, data_queue_name=MORNING_DATA_CACHE, - stop_at_hour=11, stop_at_mins=30): - logger.info("stop_at_hour is " + str(stop_at_hour) + " stop_at_mins is " + str(stop_at_mins)) - for _file_path in file_list: - logger.info("send request for file: " + _file_path) - _contact_list = read_contacts(_file_path) - random.shuffle(_contact_list) - start_send_requests(thread_number=thread_number, contact_list=_contact_list, - data_queue_name=data_queue_name, stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) - - -if __name__ == '__main__': - # file_list = ['~/Desktop/contact_list_2024-05-23.xlsx', - # '~/Desktop/contact_list_2024-05-21.xlsx', - # file_list = ['~/Desktop/contact_list_2025-10-30.xlsx'] - file_list = ['~/Desktop/contact_list_2025-11-28.xlsx'] - # file_list = ['~/Desktop/contact_list_2025-11-06.xlsx'] - # file_list = ['~/Desktop/contact_list_all.xlsx'] - send_request_for_file_list(file_list=file_list, thread_number=20, - data_queue_name=MORNING_DATA_CACHE_2, stop_at_hour=19, stop_at_mins=50)