import logging import random import sys import time from concurrent.futures import ThreadPoolExecutor from typing import Union from gsmmodem import GsmModem import params from workers.commandor_page import CommandorPage from logs.AppLogging import init_logger from params import oracle_log_sender from pojo.serial_modem import SerialModem from utils.excel_reader import ExcelHelper OTP_TIMEOUT = 240 current_gsm_modem = None # used to save the current slot position init_logger() logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) def send_command(cmd: str, ser, wait_time_in_s: int = 0) -> bytes: ser.write(cmd.encode()) msg = ser.read(100) count = 0 while 'OK' not in str(msg) and count < wait_time_in_s: time.sleep(1) count = count + 1 msg = ser.read(100) print(msg) return msg def create_modem_for_port(port: str) -> Union[SerialModem, None]: logger.info('Initializing modem... for ' + port) serial_modem = None try: modem = GsmModem(port) return SerialModem(modem=modem) except Exception as ext: logger.error(ext) return serial_modem def start_book(start_number, end_number, store=0): # read the contact, and contact the 2 objects together excel_reader = ExcelHelper() contacts = excel_reader.read_contacts()[start_number - 1: end_number - 1] print(contacts) # for i in contacts_range: # logger.info("will switch to contact {}".format(i)) # with ThreadPoolExecutor(max_workers=10) as executor: # for modem in modem_list: # current_sim_position = current_sim_position + 1 # try: # modem.get_ccid() # # find the contact with ccid # contact = [contact for contact in contacts if # contact.ccid.replace("F", "") == modem.ccid.replace("F", "")] # if len(contact) > 0: # modem.phone_number = contact[0].phone # modem.contact = contact[0] # else: # logger.info("contact not found for this ccid:{}".format(modem.ccid)) # error_msg = "slot({}):sim({}):ccid({})".format(i, current_sim_position, modem.ccid) # oracle_log_sender.send_contact_not_found(error_msg) # modem.modem.close() # continue # if modem.contact: # logger.info("contact found for this ccid") # signal = modem.modem.signalStrength # logger.info("信号强度: " + str(signal)) # proxy = get_proxy(modem.phone_number) # commandor = CommandorPage(modem, sim_position=current_sim_position, # slot_position=current_card_pool_slot, store_type=store) # # start the task in thread # executor.submit(commandor.start_page, proxy) # except Exception as error: # print(error) # continue def get_proxy(phone_number): random_id_number = str(phone_number)[1:len(str(phone_number))] proxy_username = "panleicim-res-fr-" + random_id_number logger.info("proxy_username is " + proxy_username) proxy = { "server": params.PROXY_SERVER, "username": proxy_username, "password": params.PROXY_PASSWORD } return proxy if __name__ == '__main__': # 修改起始行,结束行, 第三个参数store等于0的时候是随机,传入1的时候是总店 start_book(2, 18, store=0)