try to use executor
This commit is contained in:
+104
-17
@@ -1,5 +1,7 @@
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
@@ -11,7 +13,9 @@ import params
|
||||
from params import PROXY_SERVER, PROXY_PASSWORD
|
||||
from pojo.ReserveResultPojo import ReserveResultPojo, PublishType
|
||||
from pojo.contact_pojo import ContactPojo
|
||||
from pojo.serial_modem import SerialModem
|
||||
from utils.excel_reader import ExcelHelper
|
||||
from utils.operator import Operator, check_operator
|
||||
|
||||
RDV_URL = "https://rendezvousparis.hermes.com/client/register"
|
||||
#
|
||||
@@ -26,27 +30,108 @@ MESSAGE_FIELD_CLASS = ".message"
|
||||
|
||||
CONFIRMED_MESSAGE = "Your request for a Leather Goods appointment has been registered"
|
||||
TIME_OUT = 400000
|
||||
OTP_TIMEOUT = 240
|
||||
|
||||
|
||||
def get_random_wait_time() -> float:
|
||||
wait_time = random.randint(0, 10) / 10.0 * 5
|
||||
return wait_time
|
||||
|
||||
|
||||
class Tls(threading.local):
|
||||
def __init__(self) -> None:
|
||||
self.playwright = sync_playwright().start()
|
||||
print("Create playwright instance in Thread", threading.current_thread().name)
|
||||
|
||||
|
||||
class CommandorPage:
|
||||
def __init__(self):
|
||||
tls = Tls()
|
||||
|
||||
def __init__(self, serial_modem: SerialModem, slot_position, sim_position):
|
||||
self.otp_value = None
|
||||
self.logger = logging.getLogger("CommandorPage")
|
||||
self.is_finished = False
|
||||
self.current_gsm_modem = serial_modem
|
||||
self.slot_position = slot_position
|
||||
self.sim_position = sim_position
|
||||
self.contact = serial_modem.contact
|
||||
|
||||
def _run(self, e: threading.Event, proxy, contact: ContactPojo, on_ready_for_otp, on_success):
|
||||
self.contact = contact
|
||||
def on_success(self, result: ReserveResultPojo):
|
||||
self.logger.info("on_success called.")
|
||||
result.sim_position = self.sim_position
|
||||
result.slot_position = self.slot_position
|
||||
self.logger.info(result)
|
||||
params.firebase_store_manager.save(result)
|
||||
params.oracle_log_sender.send_appoint_result(result)
|
||||
self.is_finished = True
|
||||
|
||||
def handle_sms(self, sms):
|
||||
self.logger.info(
|
||||
u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text))
|
||||
# extract the otp number
|
||||
date = str(sms.time)[0:10]
|
||||
params.oracle_log_sender.send_sms_reception_log(sms.number, sms.text)
|
||||
if date == str(datetime.date.today()):
|
||||
self.logger.info("this sms is for today")
|
||||
if "rendez-vous" in sms.text or "appointment" in sms.text:
|
||||
self.logger.info("try to extract the otp")
|
||||
pattern = r'\d{6,8}'
|
||||
# if re.match(pattern, sms.text):
|
||||
match = re.search(pattern, sms.text)
|
||||
otp = match.group(0)
|
||||
self.logger.info("otp is " + otp)
|
||||
self.otp_value = otp
|
||||
self.logger.info("will set thread event")
|
||||
self.thread_event.set()
|
||||
# wait for the sms for 20 seconds
|
||||
while not self.is_finished:
|
||||
time.sleep(2)
|
||||
self.is_finished = True
|
||||
if self.current_gsm_modem:
|
||||
self.logger.info("will close used modem")
|
||||
self.current_gsm_modem.modem.close()
|
||||
else:
|
||||
self.logger.info("The sms is not for RDV")
|
||||
else:
|
||||
self.logger.info("The sms is not for today")
|
||||
|
||||
def set_up_sms_listener(self):
|
||||
if check_operator(self.current_gsm_modem.ccid) == Operator.LYCAMOBILE:
|
||||
# lycamobile
|
||||
self.current_gsm_modem.modem.deleteMultipleStoredSms(memory="SM")
|
||||
self.current_gsm_modem.modem.smsReceivedCallback = self.handle_sms
|
||||
self.is_finished = False
|
||||
self.current_gsm_modem.smsTextMode = False
|
||||
self.logger.info('Waiting for SMS message, for phone number ' + str(self.current_gsm_modem.phone_number))
|
||||
listen_at = time.time()
|
||||
while not self.is_finished:
|
||||
time.sleep(2)
|
||||
# check whether timeout
|
||||
now = time.time()
|
||||
if (listen_at + OTP_TIMEOUT) < now:
|
||||
self.logger.info("time out for {}, switch to next contact".format(self.current_gsm_modem.phone_number))
|
||||
# save the contact in timeout
|
||||
self.timeout_occurred()
|
||||
self.current_gsm_modem.modem.close()
|
||||
return
|
||||
return
|
||||
|
||||
def timeout_occurred(self):
|
||||
params.firebase_store_manager.save_timeout_contact(self.current_gsm_modem.contact)
|
||||
params.oracle_log_sender.send_timeout_log(self.current_gsm_modem)
|
||||
self.logger.info("will close timeout modem")
|
||||
self.thread_event.set()
|
||||
self.current_gsm_modem.modem.close()
|
||||
self.reset_air_plan_mode()
|
||||
|
||||
def _run(self, e: threading.Event, proxy):
|
||||
self.on_success_listener = on_success
|
||||
# reset otp_value to None
|
||||
self.otp_value = None
|
||||
with sync_playwright() as pwright:
|
||||
devices = random.choice(params.DEVICES)
|
||||
first_page = None
|
||||
while first_page is None:
|
||||
first_page = self.start_brower(proxy, pwright, devices)
|
||||
first_page = self.start_browser(proxy, self.tls.playwright, devices)
|
||||
proxy_username = "panleicim-res-fr-" + get_random_id_number_for_proxy()
|
||||
self.logger.info("proxy_username is " + proxy_username)
|
||||
proxy = {
|
||||
@@ -54,17 +139,17 @@ class CommandorPage:
|
||||
"username": proxy_username,
|
||||
"password": params.PROXY_PASSWORD
|
||||
}
|
||||
self._setName(contact.last_name, contact.first_name)
|
||||
self._setName(self.contact.last_name, self.contact.first_name)
|
||||
self._setPhoneCountryAndStore()
|
||||
# self.page.mouse.wheel(0, random.randint(100, 200))
|
||||
self._setPhoneNumber(contact.phone)
|
||||
self._set_email(contact.mail)
|
||||
self.setIdNumber(contact.passport)
|
||||
self._setPhoneNumber(self.contact.phone)
|
||||
self._set_email(self.contact.mail)
|
||||
self.setIdNumber(self.contact.passport)
|
||||
#
|
||||
self._checkCgu()
|
||||
# wait for sms_code field
|
||||
# self.clickOnValidBtn()
|
||||
on_ready_for_otp(e, self)
|
||||
self.thread_event = e
|
||||
otp_input = self.page.locator(OTP_FIELD_ID)
|
||||
otp_input.wait_for(state='visible', timeout=TIME_OUT)
|
||||
event_is_set = e.wait()
|
||||
@@ -81,15 +166,15 @@ class CommandorPage:
|
||||
if CONFIRMED_MESSAGE in message:
|
||||
# publish the successful message
|
||||
self.logger.info("url is " + self.page.url)
|
||||
self.publish_message_to_queue(contact, PublishType.SUCCESS.value, self.page.url)
|
||||
self.publish_message_to_queue(self.contact, PublishType.SUCCESS.value, self.page.url)
|
||||
else:
|
||||
self.logger.info("timeout")
|
||||
self.reset_air_plan_mode()
|
||||
|
||||
def start_brower(self, proxy, pwright, device) -> Union[str, None]:
|
||||
def start_browser(self, proxy, pwright, device) -> Union[str, None]:
|
||||
try:
|
||||
self.browser = pwright.webkit.launch(headless=False, timeout=90000, proxy=proxy)
|
||||
self.logger.info("user_agent is " + device)
|
||||
self.logger.info("device is " + device)
|
||||
pixel_2 = pwright.devices[device]
|
||||
context = self.browser.new_context(**pixel_2, locale='en-GB')
|
||||
self.page = context.new_page()
|
||||
@@ -102,7 +187,6 @@ class CommandorPage:
|
||||
}});
|
||||
}
|
||||
""")
|
||||
# self.page.add_init_script("""""")
|
||||
self.page.on("load", self._on_page_loaded)
|
||||
self.page.goto(RDV_URL, timeout=90000)
|
||||
return self.page.content()
|
||||
@@ -112,9 +196,10 @@ class CommandorPage:
|
||||
self.browser.close()
|
||||
return None
|
||||
|
||||
def start_page(self, proxy, contact: ContactPojo, on_ready_for_otp, on_sucess) -> threading.Event:
|
||||
def start_page(self, proxy) -> threading.Event:
|
||||
self.set_up_sms_listener()
|
||||
e = threading.Event()
|
||||
t = threading.Thread(target=self._run, args=(e, proxy, contact, on_ready_for_otp, on_sucess))
|
||||
t = threading.Thread(target=self._run, args=(e, proxy))
|
||||
t.start()
|
||||
return e
|
||||
|
||||
@@ -198,7 +283,9 @@ class CommandorPage:
|
||||
firstName=contact.first_name, lastName=contact.last_name, email=contact.mail,
|
||||
passport=contact.passport, ccid=contact.ccid)
|
||||
result.id = id
|
||||
self.on_success_listener(result)
|
||||
self.on_success(result)
|
||||
time.sleep(2)
|
||||
self.browser.close()
|
||||
|
||||
|
||||
def get_random_id_number_for_proxy() -> str:
|
||||
|
||||
@@ -4,6 +4,7 @@ import random
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from threading import Event
|
||||
from typing import Union
|
||||
|
||||
@@ -21,9 +22,6 @@ from utils.excel_reader import ExcelHelper
|
||||
from utils.operator import check_operator, Operator
|
||||
|
||||
OTP_TIMEOUT = 240
|
||||
is_finished = False
|
||||
commandor = CommandorPage()
|
||||
thread_event = None
|
||||
current_gsm_modem = None
|
||||
card_pool = CardPool(CARD_POOL_PORT)
|
||||
# used to save the current slot position
|
||||
@@ -58,77 +56,6 @@ def create_modem_for_port(port: str) -> Union[SerialModem, None]:
|
||||
return serial_modem
|
||||
|
||||
|
||||
def timeout_occurred(serial_modem: SerialModem):
|
||||
firebase_store_manager.save_timeout_contact(serial_modem.contact)
|
||||
oracle_log_sender.send_timeout_log(serial_modem)
|
||||
logger.info("will close timeout modem")
|
||||
global thread_event
|
||||
thread_event.set()
|
||||
serial_modem.modem.close()
|
||||
commandor.reset_air_plan_mode()
|
||||
|
||||
|
||||
def start_to_handle_sms(serial_modem: SerialModem):
|
||||
global current_gsm_modem
|
||||
current_gsm_modem = serial_modem.modem
|
||||
if check_operator(serial_modem.ccid) == Operator.LYCAMOBILE:
|
||||
# lycamobile
|
||||
current_gsm_modem.deleteMultipleStoredSms(memory="SM")
|
||||
serial_modem.modem.smsReceivedCallback = handle_sms
|
||||
global is_finished
|
||||
is_finished = False
|
||||
serial_modem.modem.smsTextMode = False
|
||||
logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number))
|
||||
listen_at = time.time()
|
||||
while not is_finished:
|
||||
time.sleep(2)
|
||||
# check whether timeout
|
||||
now = time.time()
|
||||
if (listen_at + OTP_TIMEOUT) < now:
|
||||
logger.info("time out for {}, switch to next contact".format(serial_modem.phone_number))
|
||||
# save the contact in timeout
|
||||
timeout_occurred(serial_modem)
|
||||
current_gsm_modem.close()
|
||||
return
|
||||
return
|
||||
|
||||
|
||||
def handle_sms(sms):
|
||||
logger.info(
|
||||
u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text))
|
||||
# extract the otp number
|
||||
date = str(sms.time)[0:10]
|
||||
params.oracle_log_sender.send_sms_reception_log(sms.number, sms.text)
|
||||
if date == str(datetime.date.today()):
|
||||
logger.info("this sms is for today")
|
||||
if "rendez-vous" in sms.text or "appointment" in sms.text:
|
||||
logger.info("try to extract the otp")
|
||||
pattern = r'\d{6,8}'
|
||||
# if re.match(pattern, sms.text):
|
||||
match = re.search(pattern, sms.text)
|
||||
otp = match.group(0)
|
||||
logger.info("otp is " + otp)
|
||||
global thread_event
|
||||
global commandor
|
||||
commandor.otp_value = otp
|
||||
logger.info("will set thread event")
|
||||
thread_event.set()
|
||||
# commandor.send_otp(otp)
|
||||
# wait for the sms for 20 seconds
|
||||
global is_finished
|
||||
while not is_finished:
|
||||
time.sleep(2)
|
||||
is_finished = True
|
||||
global current_gsm_modem
|
||||
if current_gsm_modem:
|
||||
logger.info("will close used modem")
|
||||
current_gsm_modem.close()
|
||||
else:
|
||||
logger.info("The sms is not for RDV")
|
||||
else:
|
||||
logger.info("The sms is not for today")
|
||||
|
||||
|
||||
def init_modems() -> list:
|
||||
modems = []
|
||||
for port in params.MODEM_POOL_PORTS:
|
||||
@@ -138,39 +65,6 @@ def init_modems() -> list:
|
||||
return modems
|
||||
|
||||
|
||||
def on_message_received(ch, method, properties, body):
|
||||
print(str(body))
|
||||
print(" [x] Received {} {}".format(body, datetime.datetime.now()))
|
||||
# parse the received message
|
||||
result = ReserveResultPojo.from_json(body)
|
||||
result.sim_position = current_sim_position
|
||||
result.slot_position = current_card_pool_slot
|
||||
logger.info(result)
|
||||
firebase_store_manager.save(result)
|
||||
oracle_log_sender.send_appoint_result(result)
|
||||
# set the flag to True
|
||||
global is_finished
|
||||
is_finished = True
|
||||
|
||||
|
||||
def on_success(result: ReserveResultPojo):
|
||||
logger.info("on_success called.")
|
||||
result.sim_position = current_sim_position
|
||||
result.slot_position = current_card_pool_slot
|
||||
logger.info(result)
|
||||
firebase_store_manager.save(result)
|
||||
oracle_log_sender.send_appoint_result(result)
|
||||
# set the flag to True
|
||||
global is_finished
|
||||
is_finished = True
|
||||
|
||||
|
||||
def on_ready_for_otp(e: Event, commandor: CommandorPage):
|
||||
logger.info("on_ready_for_otp() called.")
|
||||
global thread_event
|
||||
thread_event = e
|
||||
|
||||
|
||||
def start_book():
|
||||
start_slot_number = 1
|
||||
end_slot_number = 21
|
||||
@@ -191,6 +85,7 @@ def start_book():
|
||||
contacts = excel_reader.read_contacts()
|
||||
global current_sim_position
|
||||
current_sim_position = 0
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
for modem in modem_list:
|
||||
current_sim_position = current_sim_position + 1
|
||||
try:
|
||||
@@ -212,9 +107,10 @@ def start_book():
|
||||
signal = modem.modem.signalStrength
|
||||
logger.info("信号强度: " + str(signal))
|
||||
proxy = get_proxy(modem.phone_number)
|
||||
commandor.start_page(proxy=proxy, contact=modem.contact,
|
||||
on_ready_for_otp=on_ready_for_otp, on_sucess=on_success)
|
||||
start_to_handle_sms(modem)
|
||||
commandor = CommandorPage(modem, sim_position=current_sim_position,
|
||||
slot_position=current_card_pool_slot)
|
||||
# start the task in thread
|
||||
executor.submit(commandor.start_page, proxy)
|
||||
except Exception as error:
|
||||
print(error)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user