diff --git a/commandor_page.py b/commandor_page.py index b765f4b..e4eb93e 100644 --- a/commandor_page.py +++ b/commandor_page.py @@ -1,5 +1,7 @@ +import datetime import logging import random +import re import string import threading import time @@ -11,7 +13,9 @@ import params from params import PROXY_SERVER, PROXY_PASSWORD from pojo.ReserveResultPojo import ReserveResultPojo, PublishType from pojo.contact_pojo import ContactPojo +from pojo.serial_modem import SerialModem from utils.excel_reader import ExcelHelper +from utils.operator import Operator, check_operator RDV_URL = "https://rendezvousparis.hermes.com/client/register" # @@ -26,70 +30,151 @@ MESSAGE_FIELD_CLASS = ".message" CONFIRMED_MESSAGE = "Your request for a Leather Goods appointment has been registered" TIME_OUT = 400000 +OTP_TIMEOUT = 240 + def get_random_wait_time() -> float: wait_time = random.randint(0, 10) / 10.0 * 5 return wait_time +class Tls(threading.local): + def __init__(self) -> None: + self.playwright = sync_playwright().start() + print("Create playwright instance in Thread", threading.current_thread().name) + + class CommandorPage: - def __init__(self): + tls = Tls() + + def __init__(self, serial_modem: SerialModem, slot_position, sim_position): self.otp_value = None self.logger = logging.getLogger("CommandorPage") + self.is_finished = False + self.current_gsm_modem = serial_modem + self.slot_position = slot_position + self.sim_position = sim_position + self.contact = serial_modem.contact - def _run(self, e: threading.Event, proxy, contact: ContactPojo, on_ready_for_otp, on_success): - self.contact = contact + def on_success(self, result: ReserveResultPojo): + self.logger.info("on_success called.") + result.sim_position = self.sim_position + result.slot_position = self.slot_position + self.logger.info(result) + params.firebase_store_manager.save(result) + params.oracle_log_sender.send_appoint_result(result) + self.is_finished = True + + def handle_sms(self, sms): + self.logger.info( + u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text)) + # extract the otp number + date = str(sms.time)[0:10] + params.oracle_log_sender.send_sms_reception_log(sms.number, sms.text) + if date == str(datetime.date.today()): + self.logger.info("this sms is for today") + if "rendez-vous" in sms.text or "appointment" in sms.text: + self.logger.info("try to extract the otp") + pattern = r'\d{6,8}' + # if re.match(pattern, sms.text): + match = re.search(pattern, sms.text) + otp = match.group(0) + self.logger.info("otp is " + otp) + self.otp_value = otp + self.logger.info("will set thread event") + self.thread_event.set() + # wait for the sms for 20 seconds + while not self.is_finished: + time.sleep(2) + self.is_finished = True + if self.current_gsm_modem: + self.logger.info("will close used modem") + self.current_gsm_modem.modem.close() + else: + self.logger.info("The sms is not for RDV") + else: + self.logger.info("The sms is not for today") + + def set_up_sms_listener(self): + if check_operator(self.current_gsm_modem.ccid) == Operator.LYCAMOBILE: + # lycamobile + self.current_gsm_modem.modem.deleteMultipleStoredSms(memory="SM") + self.current_gsm_modem.modem.smsReceivedCallback = self.handle_sms + self.is_finished = False + self.current_gsm_modem.smsTextMode = False + self.logger.info('Waiting for SMS message, for phone number ' + str(self.current_gsm_modem.phone_number)) + listen_at = time.time() + while not self.is_finished: + time.sleep(2) + # check whether timeout + now = time.time() + if (listen_at + OTP_TIMEOUT) < now: + self.logger.info("time out for {}, switch to next contact".format(self.current_gsm_modem.phone_number)) + # save the contact in timeout + self.timeout_occurred() + self.current_gsm_modem.modem.close() + return + return + + def timeout_occurred(self): + params.firebase_store_manager.save_timeout_contact(self.current_gsm_modem.contact) + params.oracle_log_sender.send_timeout_log(self.current_gsm_modem) + self.logger.info("will close timeout modem") + self.thread_event.set() + self.current_gsm_modem.modem.close() + self.reset_air_plan_mode() + + def _run(self, e: threading.Event, proxy): self.on_success_listener = on_success # reset otp_value to None self.otp_value = None - with sync_playwright() as pwright: - devices = random.choice(params.DEVICES) - first_page = None - while first_page is None: - first_page = self.start_brower(proxy, pwright, devices) - proxy_username = "panleicim-res-fr-" + get_random_id_number_for_proxy() - self.logger.info("proxy_username is " + proxy_username) - proxy = { - "server": params.PROXY_SERVER, - "username": proxy_username, - "password": params.PROXY_PASSWORD - } - self._setName(contact.last_name, contact.first_name) - self._setPhoneCountryAndStore() - # self.page.mouse.wheel(0, random.randint(100, 200)) - self._setPhoneNumber(contact.phone) - self._set_email(contact.mail) - self.setIdNumber(contact.passport) - # - self._checkCgu() - # wait for sms_code field - # self.clickOnValidBtn() - on_ready_for_otp(e, self) - otp_input = self.page.locator(OTP_FIELD_ID) - otp_input.wait_for(state='visible', timeout=TIME_OUT) - event_is_set = e.wait() - logging.info('event set: %s', event_is_set) - if self.otp_value: - self.fill_otp(self.otp_value) - time.sleep(get_random_wait_time()) - self.clickOnValidBtn() - otp_sent = self.page.locator(MESSAGE_FIELD_CLASS) - otp_sent.wait_for(state='visible', timeout=TIME_OUT) - message = self.page.content() - # print("message is:" + message) + devices = random.choice(params.DEVICES) + first_page = None + while first_page is None: + first_page = self.start_browser(proxy, self.tls.playwright, devices) + proxy_username = "panleicim-res-fr-" + get_random_id_number_for_proxy() + self.logger.info("proxy_username is " + proxy_username) + proxy = { + "server": params.PROXY_SERVER, + "username": proxy_username, + "password": params.PROXY_PASSWORD + } + self._setName(self.contact.last_name, self.contact.first_name) + self._setPhoneCountryAndStore() + # self.page.mouse.wheel(0, random.randint(100, 200)) + self._setPhoneNumber(self.contact.phone) + self._set_email(self.contact.mail) + self.setIdNumber(self.contact.passport) + # + self._checkCgu() + # wait for sms_code field + # self.clickOnValidBtn() + self.thread_event = e + otp_input = self.page.locator(OTP_FIELD_ID) + otp_input.wait_for(state='visible', timeout=TIME_OUT) + event_is_set = e.wait() + logging.info('event set: %s', event_is_set) + if self.otp_value: + self.fill_otp(self.otp_value) time.sleep(get_random_wait_time()) - if CONFIRMED_MESSAGE in message: - # publish the successful message - self.logger.info("url is " + self.page.url) - self.publish_message_to_queue(contact, PublishType.SUCCESS.value, self.page.url) - else: - self.logger.info("timeout") - self.reset_air_plan_mode() + self.clickOnValidBtn() + otp_sent = self.page.locator(MESSAGE_FIELD_CLASS) + otp_sent.wait_for(state='visible', timeout=TIME_OUT) + message = self.page.content() + # print("message is:" + message) + time.sleep(get_random_wait_time()) + if CONFIRMED_MESSAGE in message: + # publish the successful message + self.logger.info("url is " + self.page.url) + self.publish_message_to_queue(self.contact, PublishType.SUCCESS.value, self.page.url) + else: + self.logger.info("timeout") + self.reset_air_plan_mode() - def start_brower(self, proxy, pwright, device) -> Union[str, None]: + def start_browser(self, proxy, pwright, device) -> Union[str, None]: try: self.browser = pwright.webkit.launch(headless=False, timeout=90000, proxy=proxy) - self.logger.info("user_agent is " + device) + self.logger.info("device is " + device) pixel_2 = pwright.devices[device] context = self.browser.new_context(**pixel_2, locale='en-GB') self.page = context.new_page() @@ -102,7 +187,6 @@ class CommandorPage: }}); } """) - # self.page.add_init_script("""""") self.page.on("load", self._on_page_loaded) self.page.goto(RDV_URL, timeout=90000) return self.page.content() @@ -112,9 +196,10 @@ class CommandorPage: self.browser.close() return None - def start_page(self, proxy, contact: ContactPojo, on_ready_for_otp, on_sucess) -> threading.Event: + def start_page(self, proxy) -> threading.Event: + self.set_up_sms_listener() e = threading.Event() - t = threading.Thread(target=self._run, args=(e, proxy, contact, on_ready_for_otp, on_sucess)) + t = threading.Thread(target=self._run, args=(e, proxy)) t.start() return e @@ -198,7 +283,9 @@ class CommandorPage: firstName=contact.first_name, lastName=contact.last_name, email=contact.mail, passport=contact.passport, ccid=contact.ccid) result.id = id - self.on_success_listener(result) + self.on_success(result) + time.sleep(2) + self.browser.close() def get_random_id_number_for_proxy() -> str: diff --git a/main.py b/main.py index 7f80cd2..44c25bc 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import random import re import sys import time +from concurrent.futures import ThreadPoolExecutor from threading import Event from typing import Union @@ -21,9 +22,6 @@ from utils.excel_reader import ExcelHelper from utils.operator import check_operator, Operator OTP_TIMEOUT = 240 -is_finished = False -commandor = CommandorPage() -thread_event = None current_gsm_modem = None card_pool = CardPool(CARD_POOL_PORT) # used to save the current slot position @@ -58,77 +56,6 @@ def create_modem_for_port(port: str) -> Union[SerialModem, None]: return serial_modem -def timeout_occurred(serial_modem: SerialModem): - firebase_store_manager.save_timeout_contact(serial_modem.contact) - oracle_log_sender.send_timeout_log(serial_modem) - logger.info("will close timeout modem") - global thread_event - thread_event.set() - serial_modem.modem.close() - commandor.reset_air_plan_mode() - - -def start_to_handle_sms(serial_modem: SerialModem): - global current_gsm_modem - current_gsm_modem = serial_modem.modem - if check_operator(serial_modem.ccid) == Operator.LYCAMOBILE: - # lycamobile - current_gsm_modem.deleteMultipleStoredSms(memory="SM") - serial_modem.modem.smsReceivedCallback = handle_sms - global is_finished - is_finished = False - serial_modem.modem.smsTextMode = False - logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number)) - listen_at = time.time() - while not is_finished: - time.sleep(2) - # check whether timeout - now = time.time() - if (listen_at + OTP_TIMEOUT) < now: - logger.info("time out for {}, switch to next contact".format(serial_modem.phone_number)) - # save the contact in timeout - timeout_occurred(serial_modem) - current_gsm_modem.close() - return - return - - -def handle_sms(sms): - logger.info( - u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text)) - # extract the otp number - date = str(sms.time)[0:10] - params.oracle_log_sender.send_sms_reception_log(sms.number, sms.text) - if date == str(datetime.date.today()): - logger.info("this sms is for today") - if "rendez-vous" in sms.text or "appointment" in sms.text: - logger.info("try to extract the otp") - pattern = r'\d{6,8}' - # if re.match(pattern, sms.text): - match = re.search(pattern, sms.text) - otp = match.group(0) - logger.info("otp is " + otp) - global thread_event - global commandor - commandor.otp_value = otp - logger.info("will set thread event") - thread_event.set() - # commandor.send_otp(otp) - # wait for the sms for 20 seconds - global is_finished - while not is_finished: - time.sleep(2) - is_finished = True - global current_gsm_modem - if current_gsm_modem: - logger.info("will close used modem") - current_gsm_modem.close() - else: - logger.info("The sms is not for RDV") - else: - logger.info("The sms is not for today") - - def init_modems() -> list: modems = [] for port in params.MODEM_POOL_PORTS: @@ -138,39 +65,6 @@ def init_modems() -> list: return modems -def on_message_received(ch, method, properties, body): - print(str(body)) - print(" [x] Received {} {}".format(body, datetime.datetime.now())) - # parse the received message - result = ReserveResultPojo.from_json(body) - result.sim_position = current_sim_position - result.slot_position = current_card_pool_slot - logger.info(result) - firebase_store_manager.save(result) - oracle_log_sender.send_appoint_result(result) - # set the flag to True - global is_finished - is_finished = True - - -def on_success(result: ReserveResultPojo): - logger.info("on_success called.") - result.sim_position = current_sim_position - result.slot_position = current_card_pool_slot - logger.info(result) - firebase_store_manager.save(result) - oracle_log_sender.send_appoint_result(result) - # set the flag to True - global is_finished - is_finished = True - - -def on_ready_for_otp(e: Event, commandor: CommandorPage): - logger.info("on_ready_for_otp() called.") - global thread_event - thread_event = e - - def start_book(): start_slot_number = 1 end_slot_number = 21 @@ -191,33 +85,35 @@ def start_book(): contacts = excel_reader.read_contacts() global current_sim_position current_sim_position = 0 - for modem in modem_list: - current_sim_position = current_sim_position + 1 - try: - modem.get_ccid() - # find the contact with ccid - contact = [contact for contact in contacts if - contact.ccid.replace("F", "") == modem.ccid.replace("F", "")] - if len(contact) > 0: - modem.phone_number = contact[0].phone - modem.contact = contact[0] - else: - logger.info("contact not found for this ccid:{}".format(modem.ccid)) - error_msg = "slot({}):sim({}):ccid({})".format(i, current_sim_position, modem.ccid) - oracle_log_sender.send_contact_not_found(error_msg) - modem.modem.close() + with ThreadPoolExecutor(max_workers=2) as executor: + for modem in modem_list: + current_sim_position = current_sim_position + 1 + try: + modem.get_ccid() + # find the contact with ccid + contact = [contact for contact in contacts if + contact.ccid.replace("F", "") == modem.ccid.replace("F", "")] + if len(contact) > 0: + modem.phone_number = contact[0].phone + modem.contact = contact[0] + else: + logger.info("contact not found for this ccid:{}".format(modem.ccid)) + error_msg = "slot({}):sim({}):ccid({})".format(i, current_sim_position, modem.ccid) + oracle_log_sender.send_contact_not_found(error_msg) + modem.modem.close() + continue + if modem.contact: + logger.info("contact found for this ccid") + signal = modem.modem.signalStrength + logger.info("信号强度: " + str(signal)) + proxy = get_proxy(modem.phone_number) + commandor = CommandorPage(modem, sim_position=current_sim_position, + slot_position=current_card_pool_slot) + # start the task in thread + executor.submit(commandor.start_page, proxy) + except Exception as error: + print(error) continue - if modem.contact: - logger.info("contact found for this ccid") - signal = modem.modem.signalStrength - logger.info("信号强度: " + str(signal)) - proxy = get_proxy(modem.phone_number) - commandor.start_page(proxy=proxy, contact=modem.contact, - on_ready_for_otp=on_ready_for_otp, on_sucess=on_success) - start_to_handle_sms(modem) - except Exception as error: - print(error) - continue def get_proxy(phone_number):