import datetime import logging import sys from threading import Thread from db.mongo_manager import MONGO_STORE_MANAGER from excel_reader import read_contacts from models.contact_pojo import ContactPojo from queue_message.CookiesPublisher import CookiesPublisher, SHARED_OBJECT, TEST_QUEUE, MORNING_DATA_CACHE, \ MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK from queue_message.appointmentrequestsender import AppointmentRequestSender from utiles import is_time_between from utils.AppLogging import init_logger from workers.proxies_constants import MOBILE_PROXY_LIST_FR def is_already_sent(contact: ContactPojo) -> bool: already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() for required_contact in already_sent_contacts: if contact.mail == required_contact.email: return True return False def filter_contacts(_contact_list: list) -> list: already_sent_contacts = MONGO_STORE_MANAGER.get_all_successful_items_for_day() _link_to_validate_list = MONGO_STORE_MANAGER.get_links_to_validate() _contact_list_to_book = [] for contact in _contact_list: _to_add = True for booked in already_sent_contacts: if contact.mail == booked.email: _to_add = False # 如果已经收到链接了,就不要再请求 for link_to_validate in _link_to_validate_list: if contact.mail == link_to_validate.email: logger.info("{}: link already received".format(contact.mail)) _to_add = False if _to_add: _contact_list_to_book.append(contact) return _contact_list_to_book count = 0 init_logger() logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) def send_appointment_request(message_queue_name, _contact_list, stop_at_hour=11, stop_at_mins=30): global count count = count + 1 for _contact in _contact_list: logger.info(_contact) _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) _cookiesPublisher.set_up_connection() _backUp_cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE_BAK) _backUp_cookiesPublisher.set_up_connection() receiver = AppointmentRequestSender(sub_contact_list=_contact_list, queue_name=message_queue_name, cookiesPublisher=_cookiesPublisher, bakeUpCookiesPublisher=_backUp_cookiesPublisher, logger=logger, stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) print("count is " + str(count)) receiver.run() def start_send_requests(thread_number, contact_list, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=14, stop_at_mins=56): print("start send requests") _contact_list_to_book = filter_contacts(contact_list) _segment_number = thread_number logger.info("{} contacts to book".format(len(_contact_list_to_book))) # last_thread = None thread_list = [] for i in range(0, _segment_number): logger.info("segment is {}".format(i)) _step = int(len(_contact_list_to_book) / _segment_number) _sublist = _contact_list_to_book[i * _step:_step * (i + 1)] _thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist, stop_at_hour, stop_at_mins)) thread_list.append(_thread1) _thread1.start() for _thread in thread_list: _thread.join() def send_request_for_file_list(file_list: list, thread_number: int = 20, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=11, stop_at_mins=30): for _file_path in file_list: logger.info("send request for file: " + _file_path) _contact_list = read_contacts(_file_path) start_send_requests(thread_number=thread_number, contact_list=_contact_list, data_queue_name=data_queue_name, stop_at_hour=stop_at_hour, stop_at_mins=stop_at_mins) if __name__ == '__main__': # file_list = ['~/Desktop/contact_list_2024-05-23.xlsx', # '~/Desktop/contact_list_2024-05-22.xlsx', # '~/Desktop/contact_list_2024-05-21.xlsx', # '~/Desktop/15_05_to_test.xlsx'] # file_list = ['~/Desktop/15_05_to_test.xlsx', '~/Desktop/16_05_to_test.xlsx'] file_list = ['~/Desktop/contact_list_all.xlsx'] send_request_for_file_list(file_list=file_list, thread_number=2, data_queue_name=MORNING_DATA_CACHE, stop_at_hour=16, stop_at_mins=30)