support file list

This commit is contained in:
2024-05-16 23:28:24 +02:00
parent 86cef6d85a
commit 65014b5c71
3 changed files with 29 additions and 19 deletions
+3 -2
View File
@@ -35,8 +35,9 @@ FR_DATA_IMPULSE_RES = {
} }
# 八分之一用data_impulse # 八分之一用data_impulse
MOBILE_PROXY_LIST = [FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, MOBILE_PROXY_LIST = [FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY,
FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY, FR_PROXY_MOB_OXY_STICKY
FR_PROXY_DATA_IMPULSE_STICKY] # FR_PROXY_DATA_IMPULSE_STICKY
]
class ProxyManager: class ProxyManager:
+6 -6
View File
@@ -64,18 +64,16 @@ def is_open():
class AppointmentRequestSender(threading.Thread): class AppointmentRequestSender(threading.Thread):
def __init__(self, sub_contact_list: list, logger, cookiesPublisher: CookiesPublisher, def __init__(self, sub_contact_list: list, logger, cookiesPublisher: CookiesPublisher,
just_send=False,
queue_name=REQUEST_DATA_QUEUE): queue_name=REQUEST_DATA_QUEUE):
super().__init__() super().__init__()
self.connection = None self.connection = None
self.just_send = just_send
self.logger = logger self.logger = logger
self.already_tried_contact_list = [] self.already_tried_contact_list = []
self.cookiesPublisher = cookiesPublisher self.cookiesPublisher = cookiesPublisher
self.channel = None self.channel = None
self.valid_csrf = None self.valid_csrf = None
self.list_to_retrieve_mails = sub_contact_list self.list_to_retrieve_mails = sub_contact_list
self.initial_contact_list = sub_contact_list self.initial_contact_list = sub_contact_list.copy()
self.contact_list = sub_contact_list self.contact_list = sub_contact_list
self.queue_name = queue_name self.queue_name = queue_name
self.proxy_manager = ProxyManager(logger) self.proxy_manager = ProxyManager(logger)
@@ -112,10 +110,7 @@ class AppointmentRequestSender(threading.Thread):
rs_w=_received_dict['rs_w'], rs_cd=_received_dict['rs_cd']) rs_w=_received_dict['rs_w'], rs_cd=_received_dict['rs_cd'])
_received_cookies = _received_dict["cookiesStr"] _received_cookies = _received_dict["cookiesStr"]
# remove already sent contacts # remove already sent contacts
if self.just_send:
self.contact_list = filter_contacts(self.contact_list, self.already_tried_contact_list) self.contact_list = filter_contacts(self.contact_list, self.already_tried_contact_list)
else:
self.contact_list = filter_contacts(self.contact_list)
# remove already booked contacts # remove already booked contacts
random.shuffle(self.contact_list) random.shuffle(self.contact_list)
if len(self.contact_list) > 0 and is_open(): if len(self.contact_list) > 0 and is_open():
@@ -222,6 +217,7 @@ class AppointmentRequestSender(threading.Thread):
self.channel.start_consuming() self.channel.start_consuming()
def retrieve_invalidate_urls(self): def retrieve_invalidate_urls(self):
# 如果没有已读邮件,而且需要读邮件的联系人表不为空,就读取未读邮件
if not self.already_read_emails and len(self.list_to_retrieve_mails) > 0: if not self.already_read_emails and len(self.list_to_retrieve_mails) > 0:
self.logger.info("will retrieve validate urls") self.logger.info("will retrieve validate urls")
time.sleep(30) time.sleep(30)
@@ -238,9 +234,13 @@ class AppointmentRequestSender(threading.Thread):
else: else:
self.logger.info("already read emails, is there any contacts to use") self.logger.info("already read emails, is there any contacts to use")
self.logger.info("reset already_tried_contact_list") self.logger.info("reset already_tried_contact_list")
# 重置已尝试的联系人
self.already_tried_contact_list = [] self.already_tried_contact_list = []
self.contact_list = filter_contacts(self.initial_contact_list, self.already_tried_contact_list) self.contact_list = filter_contacts(self.initial_contact_list, self.already_tried_contact_list)
self.logger.info("contact_list size is " + str(len(self.contact_list))) self.logger.info("contact_list size is " + str(len(self.contact_list)))
if len(self.contact_list) > 0: if len(self.contact_list) > 0:
self.logger.info("set already_read_emails to False") self.logger.info("set already_read_emails to False")
self.already_read_emails = False self.already_read_emails = False
else:
self.logger.info("already read emails, no contact to use -> stop")
self.channel.stop_consuming()
+18 -9
View File
@@ -61,30 +61,39 @@ def send_appointment_request(message_queue_name, _contact_list):
_cookiesPublisher = CookiesPublisher(queue_name=message_queue_name) _cookiesPublisher = CookiesPublisher(queue_name=message_queue_name)
_cookiesPublisher.set_up_connection() _cookiesPublisher.set_up_connection()
receiver = AppointmentRequestSender(sub_contact_list=_contact_list, receiver = AppointmentRequestSender(sub_contact_list=_contact_list,
queue_name=message_queue_name, just_send=True, queue_name=message_queue_name,
cookiesPublisher=_cookiesPublisher, logger=logger) cookiesPublisher=_cookiesPublisher, logger=logger)
print("count is " + str(count)) print("count is " + str(count))
receiver.run() receiver.run()
def start_send_requests(thread_number, file_path, data_queue_name=MORNING_DATA_CACHE): def start_send_requests(thread_number, contact_list, data_queue_name=MORNING_DATA_CACHE):
print("start send requests") print("start send requests")
contacts_file_path = file_path _contact_list_to_book = filter_contacts(contact_list)
_contact_list = read_contacts(contacts_file_path)
_contact_list_to_book = filter_contacts(_contact_list)
_segment_number = thread_number _segment_number = thread_number
logger.info("{} contacts to book".format(len(_contact_list_to_book))) logger.info("{} contacts to book".format(len(_contact_list_to_book)))
last_thread = None # last_thread = None
thread_list = []
for i in range(0, _segment_number): for i in range(0, _segment_number):
logger.info("segment is {}".format(i)) logger.info("segment is {}".format(i))
_step = int(len(_contact_list_to_book) / _segment_number) _step = int(len(_contact_list_to_book) / _segment_number)
_sublist = _contact_list_to_book[i * _step:_step * (i + 1)] _sublist = _contact_list_to_book[i * _step:_step * (i + 1)]
_thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist)) _thread1 = Thread(target=send_appointment_request, args=(data_queue_name, _sublist))
last_thread = _thread1 thread_list.append(_thread1)
_thread1.start() _thread1.start()
last_thread.join() for _thread in thread_list:
_thread.join()
def send_request_for_file_list(_file_list: list[str], thread_number: int = 20, data_queue_name=MORNING_DATA_CACHE):
for _file_path in file_list:
logger.info("send request for file: " + _file_path)
_contact_list = read_contacts(_file_path)
start_send_requests(thread_number=thread_number, contact_list=_contact_list,
data_queue_name=data_queue_name)
if __name__ == '__main__': if __name__ == '__main__':
start_send_requests(thread_number=30, file_path='~/Desktop/contact_list_2024-05-14.xlsx', file_list = ['~/Desktop/16_05_to_test.xlsx']
send_request_for_file_list(_file_list=file_list, thread_number=20,
data_queue_name=MORNING_DATA_CACHE_2) data_queue_name=MORNING_DATA_CACHE_2)