import logging import random import sys from threading import Thread from db.mongo_manager import MONGO_STORE_MANAGER from excel_reader import read_contacts from models.contact_pojo import ContactPojo from queue_message.CookiesPublisher import CookiesPublisher, SHARED_OBJECT, TEST_QUEUE, MORNING_DATA_CACHE, \ MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK from queue_message.appointmentrequestsender import AppointmentRequestSender from utils.AppLogging import init_logger def filter_contacts(_contact_list: list) -> list: already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() _link_to_validate_list = MONGO_STORE_MANAGER.get_links_to_validate() # Optimization: Use sets for O(1) lookup complexity sent_emails = {booked.email for booked in already_sent_contacts} validate_emails = {link.email for link in _link_to_validate_list} _contact_list_to_book = [] for contact in _contact_list: if contact.mail in sent_emails: continue # 如果已经收到链接了,就不要再请求 if contact.mail in validate_emails: logger.info("{}: link already received".format(contact.mail)) continue _contact_list_to_book.append(contact) return _contact_list_to_book count = 0 init_logger() logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) def send_appointment_request(message_queue_name, _contact_list, stop_at_hour=11, stop_at_mins=30): global count count = count + 1 for _contact in _contact_list: logger.info(_contact) _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) _cookiesPublisher.set_up_connection() _backUp_cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE_BAK) _backUp_cookiesPublisher.set_up_connection() receiver = AppointmentRequestSender(sub_contact_list=_contact_list, queue_name=message_queue_name, cookiesPublisher=_cookiesPublisher, bakeUpCookiesPublisher=_backUp_cookiesPublisher, logger=logger, stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) print("count is " + str(count)) receiver.run() def start_send_requests(thread_number, contact_list, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=14, stop_at_mins=56): print("start send requests") _contact_list_to_book = filter_contacts(contact_list) _segment_number = thread_number total_contacts = len(_contact_list_to_book) logger.info("{} contacts to book".format(total_contacts)) if total_contacts == 0: return # Optimization: Better distribution of contacts among threads thread_list = [] chunk_size = total_contacts // _segment_number remainder = total_contacts % _segment_number start_index = 0 for i in range(_segment_number): # If we have more threads than contacts, some threads will get empty lists, which is fine if start_index >= total_contacts: break logger.info("segment is {}".format(i)) # Distribute remainder to the first few threads current_chunk_size = chunk_size + (1 if i < remainder else 0) end_index = start_index + current_chunk_size _sublist = _contact_list_to_book[start_index:end_index] start_index = end_index if _sublist: _thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist, stop_at_hour, stop_at_mins)) thread_list.append(_thread1) _thread1.start() for _thread in thread_list: _thread.join() def send_request_for_file_list(file_list: list, thread_number: int = 20, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=11, stop_at_mins=30): logger.info("stop_at_hour is " + str(stop_at_hour) + " stop_at_mins is " + str(stop_at_mins)) for _file_path in file_list: logger.info("send request for file: " + _file_path) _contact_list = read_contacts(_file_path) random.shuffle(_contact_list) start_send_requests(thread_number=thread_number, contact_list=_contact_list, data_queue_name=data_queue_name, stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) if __name__ == '__main__': # file_list = ['~/Desktop/contact_list_2024-05-23.xlsx', # '~/Desktop/contact_list_2024-05-21.xlsx', # file_list = ['~/Desktop/contact_list_2025-10-30.xlsx'] file_list = ['~/Desktop/contact_list_2025-11-28.xlsx'] send_request_for_file_list(file_list=file_list, thread_number=10, data_queue_name=MORNING_DATA_CACHE_2, stop_at_hour=19, stop_at_mins=50)