Files
appointment_tool/src/workers/commandor_page.py
T
2022-07-09 11:07:47 +02:00

384 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import logging
import random
import re
import sys
import threading
import time
import traceback
from typing import Union
from playwright.sync_api import sync_playwright
from src import params, definitions
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.pojo.ModeEnum import ModeEnum
from src.pojo.ReserveResultPojo import ReserveResultPojo, PublishType
from src.pojo.contact_pojo import ContactPojo
from src.proxy.proxy_type import ProxyType
from src.workers.SolveCaptch import SolveCaptcha
RDV_URL = "https://rendezvousparis.hermes.com/client/register"
# RDV_URL = "file:///Users/lpan/Downloads/test_appointment.html"
# RDV_URL = "https://api.ipify.org"
# RDV_URL ="https://bot.sannysoft.com/"
REGEX_RDV_URL = "https:\/\/rendezvousparis\.hermes\.com\/client\/register\/[A-Z0-9]+"
otp_value = None
OTP_FIELD_ID = "#sms_code"
MESSAGE_FIELD_CLASS = ".message"
BLANK_URL = "about:blank"
CONFIRMED_MESSAGE = "Your request for a Leather Goods appointment has been registered"
CONFIRMED_MESSAGE_FR = "Votre demande de rendez-vous Maroquinerie a bien été enregistrée et nous vous en remercions."
DOUBLE_REQUEST_ERROR_MESSAGE = "A request with the same data has already been validated today."
DOUBLE_REQUEST_ERROR_MESSAGE_FR = "Une demande avec les données saisies a déjà été validée aujourdhui."
TOO_MANY_REQUEST_ERROR_MESSAGE = "Due to a large number of requests"
TOO_MANY_REQUEST_ERROR_MESSAGE_FR = "Suite à un trop grand nombre de demandes"
CAPTCHA_ERROR_MESSAGE = "Error verifying captcha, please try again"
CAPTCHA_ERROR_MESSAGE_FR = "La vérification du captcha a échoué"
TIME_OUT = 10 * 60 * 1000 # 10 mins
OTP_TIMEOUT = 240
PAGE_TIMEOUT = 40000
def get_random_wait_time() -> float:
wait_time = random.randint(0, 10) / 10.0 * 1
return wait_time
class Tls(threading.local):
def __init__(self) -> None:
self.playwright = sync_playwright().start()
class CommandorPage:
tls = Tls()
def __init__(self, contact: ContactPojo, store_type=0, proxy_type=ProxyType.BRIGHT_DATA,
mode: ModeEnum = ModeEnum.MANUAL, headless=False):
self.otp_value = None
self.logger = logging.getLogger("约会页面:" + str(contact.phone))
self.is_finished = False
self.contact = contact
self.proxy_type = proxy_type
self.is_event_sent = False
self.is_captcha_in_error = False
self.is_filling_fields = False
self.headless = headless
self.appointment_mode = mode
# 0: random
# 1: faubourg
# 2: George
# 3: Sèvres
self.store_map = {
1: "faubourg",
2: "georgev",
3: "sevres"
}
self.store_type = store_type
def on_success(self, result: ReserveResultPojo):
self.logger.info("on_success called.")
self.is_finished = True
if not self.is_event_sent:
self.logger.info("will send successful event")
self.logger.info(result)
params.oracle_log_sender.send_appoint_result(result)
self.is_event_sent = True
def timeout_occurred(self):
params.oracle_log_sender.send_timeout_log(self.contact)
self.logger.info("will close timeout modem")
self.termine()
def _run(self, e: threading.Event, proxy):
self.logger.info("will start browser")
self.on_success_listener = on_success
# reset otp_value to None
self.otp_value = None
devices = random.choice(params.DEVICES)
first_page = None
while first_page is None:
first_page = self.start_browser(proxy, self.tls.playwright, devices)
proxy = params.get_proxy(self.proxy_type)
# self.thread_event = e
otp_input = self.page.locator(OTP_FIELD_ID)
otp_input.wait_for(state='visible', timeout=TIME_OUT)
self.logger.info("timeout")
self.termine()
def fill_fields(self):
if not self.is_filling_fields:
self.is_filling_fields = True
self.logger.info("填充信息: " + str(self.contact.phone))
self._set_name(self.contact.last_name, self.contact.first_name)
self._setPhoneCountryAndStore()
self._set_phone_number("0" + str(self.contact.phone))
self._set_email(self.contact.mail)
self._set_id_number(self.contact.passport)
self._checkCgu()
if self.appointment_mode == ModeEnum.AUTOMATIC:
self.resolve_captcha()
self.is_filling_fields = False
def start_browser(self, proxy, pwright, device) -> Union[str, None]:
try:
self.browser = pwright.webkit.launch(headless=self.headless, timeout=PAGE_TIMEOUT, proxy=proxy)
self.logger.info("模拟设备: " + device)
simulated_mobile = pwright.devices[device]
context = self.browser.new_context(**simulated_mobile, locale='fr-FR')
self.page = context.new_page()
# hide webdriver information
self.page.add_init_script("""() => {
Object.defineProperty(navigator,'webdriver',{get: () => undefined});
Object.defineProperty(navigator, 'platform', {
get: () => {
return "iPhone";
}});
}
""")
self.page.on("load", self._on_page_loaded)
self.page.on("response", self.handle_response)
self.page.goto(RDV_URL, timeout=PAGE_TIMEOUT)
return self.page.content()
except Exception as error:
params.oracle_log_sender.send_error(str(error))
traceback.print_exc(*sys.exc_info())
self.logger.exception(error)
self.logger.info("will close browser")
self.browser.close()
return None
def handle_response(self, response):
pattern = re.compile(REGEX_RDV_URL)
if pattern.match(response.url):
self.logger.info("result url found: " + response.url)
# self.publish_message_to_queue(self.contact, PublishType.PENDING, response.url)
def start_page(self, proxy):
e = threading.Event()
self._run(e, proxy)
def _on_page_loaded(self):
self.logger.info("页面加载完毕")
self.logger.info("url is " + self.page.url)
if self.page.url == RDV_URL:
self.fill_fields()
try:
message = self.page.content()
if CONFIRMED_MESSAGE in message or CONFIRMED_MESSAGE_FR in message:
# publish the successful message
self.publish_message_to_queue(self.contact, PublishType.SUCCESS, self.page.url)
self.get_errors()
except Exception as error:
self.logger.error(error)
def on_document_loaded(self):
self.logger.info("on_document_loaded called")
def _setPhoneCountryAndStore(self):
try:
if self.store_type == 0:
self.page.evaluate("""()=>{
//document.getElementById("phone_country").focus();
document.getElementById("phone_country").value = \"FR\" }""")
else:
store_to_choose = self.store_map[self.store_type]
self.page.evaluate("""(store_to_choose)=>{
document.getElementById("prefer").value = store_to_choose;
//document.getElementById("phone_country").focus();
document.getElementById("phone_country").value = \"FR\" }""", store_to_choose)
except Exception as error:
self.logger.error(error)
def _set_phone_number(self, phoneNumber):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""(phoneNumber)=>document.getElementById("phone_number").value =phoneNumber""",
phoneNumber)
except Exception as error:
self.logger.error(error)
def _set_name(self, lastName, firstName):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""(name)=> {
let surname = document.getElementById("surname");
if(surname.value.length == 0){
// surname.focus();
surname.value = name.lastName;
document.getElementById("name").focus();
document.getElementById("name").value = name.firstName
}}
""", {'lastName': lastName, 'firstName': firstName})
except Exception as error:
self.logger.error(error)
def get_errors(self):
# send error result
if self.page.url != BLANK_URL:
# no need to push blank url to db
if self.page.url != RDV_URL:
# no need to push RDV url to db
self.publish_message_to_queue(self.contact, PublishType.ERROR, self.page.url)
try:
items = self.page.query_selector("div.alert")
if items:
erro_content = items.inner_html()
self.logger.info("错误:" + erro_content)
self._handle_errors(erro_content)
except Exception as ext:
self.logger.error(ext)
def _handle_errors(self, erro_content: str):
if DOUBLE_REQUEST_ERROR_MESSAGE in erro_content or DOUBLE_REQUEST_ERROR_MESSAGE_FR in erro_content:
# this email has been already used
if not self.is_finished:
params.oracle_log_sender.send_double_data_error(self.contact)
self.is_finished = True
self.termine()
elif TOO_MANY_REQUEST_ERROR_MESSAGE in erro_content or TOO_MANY_REQUEST_ERROR_MESSAGE_FR in erro_content:
# this email is in black list
if not self.is_finished:
params.oracle_log_sender.send_too_many_error(self.contact)
MONGO_STORE_MANAGER.insert_blacklist_contact(self.contact)
self.is_finished = True
self.termine()
elif CAPTCHA_ERROR_MESSAGE in erro_content or CAPTCHA_ERROR_MESSAGE_FR in erro_content:
# this email has been already used
self.is_captcha_in_error = True
if not self.is_finished:
# save the error to database with contact info
self.handle_captcha_error()
self.is_finished = True
# no need to retry captcha, if retry ,will generate DOUBLE_REQUEST_ERROR_MESSAGE
self.termine()
# self.resolve_captcha()
def _set_email(self, email):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""(email)=>{
let emailElement = document.getElementById("email")
if(emailElement.value.length == 0){
emailElement.focus();
document.getElementById("email").value = email;}}""", email)
except Exception as error:
self.logger.error(error)
def _set_id_number(self, id):
time.sleep(get_random_wait_time())
try:
self.page.evaluate(""" (id) =>{
document.getElementById("passport_id").focus();
document.getElementById("passport_id").value = id}""", id)
except Exception as error:
self.logger.error(error)
def _checkCgu(self):
try:
self.page.evaluate("""
document.getElementById("cgu").focus();
document.getElementById("cgu").checked = true;
document.getElementById("processing").focus();
document.getElementById("processing").checked = true""")
except Exception as error:
self.logger.error(error)
def clickOnValidBtn(self):
time.sleep(get_random_wait_time())
try:
self.page.evaluate("""document.getElementsByClassName("btn")[0].focus();""")
time.sleep(get_random_wait_time())
self.page.evaluate("""
document.getElementsByClassName("btn")[0].click();""")
except Exception as error:
self.logger.error(error)
def fill_otp(self, otp: str):
self.page.focus(OTP_FIELD_ID)
time.sleep(get_random_wait_time())
self.page.fill(OTP_FIELD_ID, otp)
def termine(self):
self.logger.info("will close browser")
time.sleep(1)
self.browser.close()
def publish_message_to_queue(self, contact: ContactPojo, status: PublishType, url: str):
# create the message
id = url.split("/")[-1]
result = ReserveResultPojo(type=status, phone=contact.phone, message=status.value, url=url,
firstName=contact.first_name, lastName=contact.last_name, email=contact.mail,
passport=contact.passport, ccid=contact.ccid)
result.id = id
result.store_type = self.store_type
definitions.firebase_store_manager.save(result)
collection_name = str(datetime.date.today())
MONGO_STORE_MANAGER.insert_reserve_result(collection_name=collection_name, reserve=result)
MONGO_STORE_MANAGER.delete_captcha_error_contact_for_current_day(self.contact)
MONGO_STORE_MANAGER.remove_contact_from_black_list(self.contact)
if status is PublishType.SUCCESS:
self.on_success(result)
time.sleep(2)
self.browser.close()
def resolve_captcha(self):
self.captcha_solver = SolveCaptcha(self.page)
self.captcha_solver.start(self.fill_captcha_solution)
def fill_captcha_solution(self, solution):
self.logger.info("will input solution: " + solution)
try:
self.page.evaluate("""(solution)=>{
document.getElementById("g-recaptcha-response").innerHTML=solution;}""", solution)
self.logger.info("will click on valid btn")
self.clickOnValidBtn()
# wait for 20s
time.sleep(20)
if not self.is_finished:
if not self.is_captcha_in_error:
self.clickOnValidBtn()
else:
self.is_captcha_in_error = False
except Exception as error:
self.logger.error(error)
self.page.reload(timeout=PAGE_TIMEOUT)
def handle_captcha_error(self):
MONGO_STORE_MANAGER.insert_captcha_error_contact(self.contact)
params.oracle_log_sender.send_captcha_error(self.contact)
def on_success(result: ReserveResultPojo):
pass
def launch_page():
contact = ContactPojo(phone_number="+33758912245", passport_number="82546975", last_name="XU",
first_name="xingzhen",
mail="ColbyPatel653@gmail.com", ccid="", position=0)
page = CommandorPage(contact, store_type=1)
return page.start_page(params.get_proxy(ProxyType.BRIGHT_DATA))
def wait_for_otp(event: threading.Event, commandor: CommandorPage):
sec = input("Press Enter otp to continue...\n")
print("input otp is: " + sec)
commandor.otp_value = sec
event.set()
if __name__ == '__main__':
launch_page()
# time = get_random_wait_time()
# print(time)
# import urllib.request
#
# proxy = urllib.request.ProxyHandler({'https': 'http://panleicim-res-fr-121:94sY7zwBG13i@gw.ntnt.io:5959'})
# opener = urllib.request.build_opener(proxy)
# urllib.request.install_opener(opener)
# content = urllib.request.urlopen('https://api.ipify.org').read()
# print(content)