Files
appointment_tool/workers/commandor_page.py
T
2022-06-06 23:38:03 +02:00

412 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import random
import re
import string
import threading
import time
from typing import Union
from playwright.sync_api import sync_playwright
import params
from check_results import BLANK_URL
from params import PROXY_SERVER, PROXY_PASSWORD
from pojo.ModeEnum import ModeEnum
from pojo.ReserveResultPojo import ReserveResultPojo, PublishType
from pojo.contact_pojo import ContactPojo
from workers.SolveCaptch import SolveCaptcha
RDV_URL = "https://rendezvousparis.hermes.com/client/register"
# RDV_URL = "file:///Users/lpan/Downloads/test_appointment.html"
# RDV_URL = "https://api.ipify.org"
# RDV_URL ="https://bot.sannysoft.com/"
REGEX_RDV_URL = "https:\/\/rendezvousparis\.hermes\.com\/client\/register\/[A-Z0-9]+"
otp_value = None
OTP_FIELD_ID = "#sms_code"
MESSAGE_FIELD_CLASS = ".message"
CONFIRMED_MESSAGE = "Your request for a Leather Goods appointment has been registered"
CONFIRMED_MESSAGE_FR = "Votre demande de rendez-vous Maroquinerie a bien été enregistrée et nous vous en remercions."
DOUBLE_REQUEST_ERROR_MESSAGE = "A request with the same data has already been validated today."
DOUBLE_REQUEST_ERROR_MESSAGE_FR = "Une demande avec les données saisies a déjà été validée aujourdhui."
TOO_MANY_REQUEST_ERROR_MESSAGE = "Due to a large number of requests"
TOO_MANY_REQUEST_ERROR_MESSAGE_FR = "Suite à un trop grand nombre de demandes aujourdhui,"
CAPTCHA_ERROR_MESSAGE = "Error verifying captcha, please try again"
CAPTCHA_ERROR_MESSAGE_FR = "La vérification du captcha a échoué"
TIME_OUT = 400000
OTP_TIMEOUT = 240
PAGE_TIMEOUT = 40000
def get_random_wait_time() -> float:
wait_time = random.randint(0, 10) / 10.0 * 1
return wait_time
class Tls(threading.local):
def __init__(self) -> None:
self.playwright = sync_playwright().start()
class CommandorPage:
tls = Tls()
def __init__(self, contact: ContactPojo, store_type=0, proxy_type=0, mode: ModeEnum = ModeEnum.MANUAL):
self.otp_value = None
self.logger = logging.getLogger("约会页面:" + str(contact.phone))
self.is_finished = False
self.contact = contact
self.proxy_type = proxy_type
self.is_event_sent = False
self.is_captcha_in_error = False
self.is_filling_fields = False
self.appointment_mode = mode
# 0: random
# 1: faubourg
# 2: George
# 3: Sèvres
self.store_map = {
1: "faubourg",
2: "georgev",
3: "sevres"
}
self.store_type = store_type
def on_success(self, result: ReserveResultPojo):
self.logger.info("on_success called.")
self.is_finished = True
if not self.is_event_sent:
self.logger.info("will send successful event")
self.logger.info(result)
params.oracle_log_sender.send_appoint_result(result)
self.is_event_sent = True
def timeout_occurred(self):
params.oracle_log_sender.send_timeout_log(self.contact)
self.logger.info("will close timeout modem")
self.thread_event.set()
self.termine()
def _run(self, e: threading.Event, proxy):
self.logger.info("will start browser")
self.on_success_listener = on_success
# reset otp_value to None
self.otp_value = None
devices = random.choice(params.DEVICES)
first_page = None
while first_page is None:
first_page = self.start_browser(proxy, self.tls.playwright, devices)
proxy_username = params.get_proxy_name_prefix(self.proxy_type) + params.get_random_id_number_for_proxy()
self.logger.info("proxy_username is " + proxy_username)
proxy = {
"server": params.PROXY_SERVER,
"username": proxy_username,
"password": params.PROXY_PASSWORD
}
# wait for sms_code field
# self.clickOnValidBtn()
self.thread_event = e
otp_input = self.page.locator(OTP_FIELD_ID)
otp_input.wait_for(state='visible', timeout=TIME_OUT)
event_is_set = e.wait()
logging.info('event set: %s', event_is_set)
if self.otp_value:
self.fill_otp(self.otp_value)
time.sleep(get_random_wait_time())
self.clickOnValidBtn()
otp_sent = self.page.locator(MESSAGE_FIELD_CLASS)
otp_sent.wait_for(state='visible', timeout=TIME_OUT)
time.sleep(get_random_wait_time())
message = self.page.content()
if CONFIRMED_MESSAGE in message or CONFIRMED_MESSAGE_FR in message:
# publish the successful message
self.logger.info("url is " + self.page.url)
self.publish_message_to_queue(self.contact, PublishType.SUCCESS, self.page.url)
else:
self.logger.info("timeout")
self.termine()
def fill_fields(self):
if not self.is_filling_fields:
self.is_filling_fields = True
self.logger.info("填充信息: " + str(self.contact.phone))
self._set_name(self.contact.last_name, self.contact.first_name)
self._setPhoneCountryAndStore()
self._setPhoneNumber(self.contact.phone)
self._set_email(self.contact.mail)
self.setIdNumber(self.contact.passport)
self._checkCgu()
if self.appointment_mode == ModeEnum.AUTOMATIC:
self.resolve_captcha()
self.is_filling_fields = False
def start_browser(self, proxy, pwright, device) -> Union[str, None]:
try:
self.browser = pwright.webkit.launch(headless=False, timeout=PAGE_TIMEOUT, proxy=proxy)
self.logger.info("模拟设备: " + device)
pixel_2 = pwright.devices[device]
context = self.browser.new_context(**pixel_2, locale='fr-FR')
self.page = context.new_page()
# hide webdriver information
self.page.add_init_script("""() => {
Object.defineProperty(navigator,'webdriver',{get: () => undefined});
Object.defineProperty(navigator, 'platform', {
get: () => {
return "iPhone";
}});
}
""")
self.page.on("load", self._on_page_loaded)
self.page.on("response", self.handle_response)
self.page.goto(RDV_URL, timeout=PAGE_TIMEOUT)
return self.page.content()
except Exception as error:
params.oracle_log_sender.send_error(str(error))
self.logger.exception(error)
self.logger.info("will close browser")
self.browser.close()
return None
def handle_response(self, response):
pattern = re.compile(REGEX_RDV_URL)
if pattern.match(response.url):
self.logger.info("result url found: " + response.url)
# self.publish_message_to_queue(self.contact, PublishType.PENDING, response.url)
def start_page(self, proxy):
e = threading.Event()
self._run(e, proxy)
def _on_page_loaded(self):
self.logger.info("page loaded")
# self.logger.info("content is " + self.page.content())
self.logger.info("url is " + self.page.url)
if self.page.url == RDV_URL:
self.fill_fields()
try:
message = self.page.content()
if CONFIRMED_MESSAGE in message or CONFIRMED_MESSAGE_FR in message:
# publish the successful message
self.publish_message_to_queue(self.contact, PublishType.SUCCESS, self.page.url)
self.get_errors()
except Exception as error:
self.logger.error(error)
def on_document_loaded(self):
self.logger.info("on_document_loaded called")
def _setPhoneCountryAndStore(self):
try:
if self.store_type == 0:
self.page.evaluate("""()=>{
//document.getElementById("phone_country").focus();
document.getElementById("phone_country").value = \"FR\" }""")
else:
store_to_choose = self.store_map[self.store_type]
self.page.evaluate("""(store_to_choose)=>{
document.getElementById("prefer").value = store_to_choose;
//document.getElementById("phone_country").focus();
document.getElementById("phone_country").value = \"FR\" }""", store_to_choose)
except Exception as error:
self.logger.error(error)
def _setPhoneNumber(self, phoneNumber):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""(phoneNumber)=>document.getElementById("phone_number").value =phoneNumber""",
phoneNumber)
except Exception as error:
self.logger.error(error)
def _set_name(self, lastName, firstName):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""(name)=> {
let surname = document.getElementById("surname");
if(surname.value.length == 0){
// surname.focus();
surname.value = name.lastName;
document.getElementById("name").focus();
document.getElementById("name").value = name.firstName
}}
""", {'lastName': lastName, 'firstName': firstName})
except Exception as error:
self.logger.error(error)
def get_errors(self):
# send error result
if self.page.url != BLANK_URL:
# no need to push blank url to db
self.publish_message_to_queue(self.contact, PublishType.ERROR, self.page.url)
try:
items = self.page.query_selector("div.alert")
if items:
erro_content = items.inner_html()
self.logger.info("错误:" + erro_content)
self._handle_errors(erro_content)
except Exception as ext:
self.logger.error(ext)
def _handle_errors(self, erro_content: str):
if DOUBLE_REQUEST_ERROR_MESSAGE in erro_content or DOUBLE_REQUEST_ERROR_MESSAGE_FR in erro_content:
# this email has been already used
if not self.is_finished:
params.oracle_log_sender.send_double_data_error(self.contact)
self.is_finished = True
self.termine()
elif TOO_MANY_REQUEST_ERROR_MESSAGE in erro_content or TOO_MANY_REQUEST_ERROR_MESSAGE_FR in erro_content:
# this email has been already used
if not self.is_finished:
params.oracle_log_sender.send_too_many_error(self.contact)
self.is_finished = True
self.termine()
elif CAPTCHA_ERROR_MESSAGE in erro_content or CAPTCHA_ERROR_MESSAGE_FR in erro_content:
# this email has been already used
self.is_captcha_in_error = True
if not self.is_finished:
params.oracle_log_sender.send_captcha_error(self.contact)
self.is_finished = True
# no need to retry captcha, if retry ,will generate DOUBLE_REQUEST_ERROR_MESSAGE
self.termine()
# self.resolve_captcha()
def _set_email(self, email):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""(email)=>{
let emailElement = document.getElementById("email")
if(emailElement.value.length == 0){
emailElement.focus();
document.getElementById("email").value = email;}}""", email)
except Exception as error:
self.logger.error(error)
def setIdNumber(self, id):
time.sleep(get_random_wait_time())
try:
self.page.evaluate(""" (id) =>{
document.getElementById("passport_id").focus();
document.getElementById("passport_id").value = id}""", id)
except Exception as error:
self.logger.error(error)
def _checkCgu(self):
try:
self.page.evaluate("""
document.getElementById("cgu").focus();
document.getElementById("cgu").checked = true;
document.getElementById("processing").focus();
document.getElementById("processing").checked = true""")
except Exception as error:
self.logger.error(error)
def clickOnValidBtn(self):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""document.getElementsByClassName("btn")[0].focus();""")
time.sleep(get_random_wait_time())
self.page.evaluate("""
document.getElementsByClassName("btn")[0].click();""")
except Exception as error:
self.logger.error(error)
def clear_app_data(self):
pass
def fill_otp(self, otp: str):
self.page.focus(OTP_FIELD_ID)
time.sleep(get_random_wait_time())
self.page.fill(OTP_FIELD_ID, otp)
def termine(self):
self.logger.info("will close browser")
time.sleep(1)
self.browser.close()
def publish_message_to_queue(self, contact: ContactPojo, status: PublishType, url: str):
# create the message
id = url.split("/")[-1]
result = ReserveResultPojo(type=status, phone=contact.phone, message=status.value, url=url,
firstName=contact.first_name, lastName=contact.last_name, email=contact.mail,
passport=contact.passport, ccid=contact.ccid)
result.id = id
result.store_type = self.store_type
params.firebase_store_manager.save(result)
if status is PublishType.SUCCESS:
self.on_success(result)
time.sleep(2)
self.browser.close()
def resolve_captcha(self):
self.captcha_solver = SolveCaptcha(self.page)
self.captcha_solver.start(self.fill_captcha_solution)
def fill_captcha_solution(self, solution):
self.logger.info("will input solution: " + solution)
try:
self.page.evaluate("""(solution)=>{
document.getElementById("g-recaptcha-response").innerHTML=solution;}""", solution)
self.logger.info("will click on valid btn")
self.clickOnValidBtn()
# wait for 20s
time.sleep(20)
if not self.is_finished:
if not self.is_captcha_in_error:
self.clickOnValidBtn()
else:
self.is_captcha_in_error = False
except Exception as error:
self.logger.error(error)
self.page.reload(timeout=PAGE_TIMEOUT)
def get_random_id_number() -> str:
S = 8 # number of characters in the string.
ran = ''.join(random.choices(string.digits, k=S))
id_number = "57" + str(ran)
print("The randomly generated string is : 94" + str(ran)) # print the random data
return id_number
def on_success(result: ReserveResultPojo):
pass
def launch_page():
PROXY_USERNAME = "panleicim-res-fr-" + params.get_random_id_number_for_proxy()
print("proxy_username is " + PROXY_USERNAME)
proxy = {
"server": PROXY_SERVER,
"username": PROXY_USERNAME,
"password": PROXY_PASSWORD
}
passport_number = get_random_id_number()
print("passport_number is " + passport_number)
contact = ContactPojo(phone_number="+33758912245", passport_number=passport_number, last_name="XU",
first_name="xingzhen",
mail="ColbyPatel653@gmail.com", ccid="", position=0)
page = CommandorPage(contact, store_type=1)
return page.start_page(None)
def wait_for_otp(event: threading.Event, commandor: CommandorPage):
sec = input("Press Enter otp to continue...\n")
print("input otp is: " + sec)
commandor.otp_value = sec
event.set()
if __name__ == '__main__':
launch_page()
# time = get_random_wait_time()
# print(time)
# import urllib.request
#
# proxy = urllib.request.ProxyHandler({'https': 'http://panleicim-res-fr-121:94sY7zwBG13i@gw.ntnt.io:5959'})
# opener = urllib.request.build_opener(proxy)
# urllib.request.install_opener(opener)
# content = urllib.request.urlopen('https://api.ipify.org').read()
# print(content)