import re import time import serial from definitions import BAUDRATE from error.SIMError import SIMError from logs.LogSender import LOG_APPOINTMENT_SUCCESS, SUBJECT_SIM_INFO from params import firebase_store_manager, oracle_log_sender from pojo.SimInfoPojo import SimInfoPojo from utils.excel_reader import ExcelHelper from utils.operator import check_operator, Operator class ModemPool: phone_number_position = 10 TAG = "ModemPool" def __init__(self, port_list: list): self._port_list = port_list self._serial_list = [] self._excel_helper = ExcelHelper() self._log_sender = oracle_log_sender self._db_manager = firebase_store_manager for port in self._port_list: ser = serial.Serial(port, BAUDRATE, timeout=1) self._serial_list.append(ser) def reset_all_modems(self): print("will reset modem pool") for ser in self._serial_list: # may encontre exception here, multi-access to serial port time.sleep(2) self._send_command("AT+CFUN=1,1\r", ser) # wait for 20 second, so that the modem can init all the sims time.sleep(20) def _generate_error_msg(self, slot_position, index, error: SIMError): msg = "slot({}) SIM({}), error:{}".format(slot_position, index + 1, error.value) self._log_sender.send_log(msg, source=self.TAG, subject=SUBJECT_SIM_INFO, type=error.value) return msg def get_raw_phone_number(self, slot_position): for index, ser in enumerate(self._serial_list): sim_position = index + 1 position = (slot_position - 1) * len(self._port_list) + sim_position # unlock sim unlock_cmd = 'AT+CPIN="{0}\r"'.format("0000") self._send_command(unlock_cmd, ser, 10) cmd = "AT+CCID\r" response = str(self._send_command(cmd, ser)) ccid_group = re.search("[0-9F]+", response) ccid = ccid_group.group(0) operator = check_operator(ccid) if operator == Operator.SFR or operator == Operator.CHINA_TELECOM: contacts = self._excel_helper.read_contacts("/contact_all.xlsx") contact = [contact for contact in contacts if contact.ccid.replace("F", "") == ccid.replace("F", "")] if len(contact) > 0: phone_number = contact[0].phone self._db_manager.save_sim_info( SimInfoPojo(phone=str(phone_number), ccid=ccid, position=position, sim_position=sim_position, slot_position=slot_position, operator=operator.value)) else: error_msg = "slot({}),sim({})".format(slot_position, sim_position) oracle_log_sender.send_contact_not_found(error_msg) else: print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port)) if not self._select_sim_storage(ser): print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR)) continue msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser) if "Unfortunately" in str(msg): print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED)) continue elif "CME ERROR" in str(msg): print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR)) continue elif len(msg) == 0: print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT)) continue # find phone number match = re.search(r'33\d{9}', str(msg)) phone_number = match.group(0) print("phone is " + phone_number) if phone_number: self._db_manager.save_sim_info( SimInfoPojo(phone=phone_number, ccid=ccid, position=position, slot_position=slot_position, sim_position=sim_position, operator=operator.value)) self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO, type=LOG_APPOINTMENT_SUCCESS) # write the number to sim card's phonebook # cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r' # self._send_command(cmd, ser, wait_time_in_s=2) def _select_sim_storage(self, ser) -> bool: # use SIM Card storage cmd_sm = "AT+CPBS=\"SM\"\r" result = self._send_command(cmd_sm, ser) if "ERROR" in str(result): return False else: return True def _send_command(self, cmd: str, ser, wait_time_in_s: int = 0) -> bytes: ser.write(cmd.encode()) msg = None try: msg = ser.read(100) count = 0 while 'OK' not in str(msg) and count < wait_time_in_s: time.sleep(1) count = count + 1 msg = ser.read(100) except Exception as exc: print(exc) print(msg) return msg def _execut_USSD_cmd(self, cmd, ser) -> bytes: # the timeout for ussd command can be 120 s in mac return self._send_command(cmd, ser, 120)