Files
appointment_tool/main.py
T

66 lines
2.8 KiB
Python

import logging
import sys
from concurrent.futures import ThreadPoolExecutor
from src import params, definitions
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.logs.AppLogging import init_logger
from src.pojo.ModeEnum import ModeEnum
from src.pojo.contact_pojo import ContactPojo
from src.proxy.proxy_type import ProxyType
from src.utils.black_list_checker import can_send_request
from src.utils.excel_reader import ExcelHelper
from src.workers.commandor_page import CommandorPage
# used to save the current slot position
init_logger()
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
def start_book(start_number, end_number, store_choose_state=0, max_workers=10, proxy_type=ProxyType.BRIGHT_DATA,
mode: ModeEnum = ModeEnum.MANUAL, headless=False):
# read the contact, and contact the 2 objects together
excel_reader = ExcelHelper()
all_contacts = excel_reader.read_contacts()
if len(all_contacts) <= end_number:
end_number = len(all_contacts)
contacts = all_contacts[start_number - 1: end_number]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for contact in contacts:
proxy = get_proxy(proxy_type)
# start the task in thread
if can_send_request(contact):
executor.submit(
CommandorPage(contact, store_type=store_choose_state, proxy_type=proxy_type, mode=mode,
headless=headless).start_page,
proxy)
else:
logger.info("do not send request --> skip")
def recheck_the_captcha_error_contacts(on_no_contact_found, store_type=0, mode: ModeEnum = ModeEnum.MANUAL,
max_workers=10):
# get all the contacts in captcha error
contact_list = MONGO_STORE_MANAGER.get_captcha_error_contacts_for_current_day()
if len(contact_list) == 0:
on_no_contact_found()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for errorContact in contact_list:
contact = ContactPojo.get_contact_from_error_contact(errorContact)
proxy = get_proxy()
# start the task in thread
executor.submit(
CommandorPage(contact, store_type=store_type, mode=mode).start_page,
proxy)
def get_proxy(proxy_type=ProxyType.BRIGHT_DATA):
return params.get_proxy(proxy_type)
if __name__ == '__main__':
# 修改联系人行,结束联系人行 第三个参数store等于0的时候是随机,传入1的时候是总店
start_book(101, 200, store_choose_state=1, mode=ModeEnum.AUTOMATIC, headless=True)
# recheck_the_captcha_error_contacts(store_type=0, mode=ModeEnum.AUTOMATIC, on_no_contact_found=lambda: None)