import re import time import serial from serial import Serial from db.DbManager import DataManager from error.SIMError import SIMError from logs.LogSender import LogSender, LOG_ERROR from pojo.SimInfoPojo import SimInfoPojo from utils.excel_reader import ExcelHelper class ModemPool: BAUDRATE = 115200 phone_number_position = 10 TAG = "ModemPool" def __init__(self, port_list: list): self._port_list = port_list self._serial_list = [] self._excel_helper = ExcelHelper() self._log_sender = LogSender() self._db_manager = DataManager() for port in self._port_list: ser = serial.Serial(port, self.BAUDRATE, timeout=1) self._serial_list.append(ser) def reset_all_modems(self): for ser in self._serial_list: # may encontre exception here, multi-access to serial port time.sleep(2) self._send_command("AT+CFUN=1,1\r", ser) # wait for 20 second, so that the modem can init all the sims time.sleep(20) def _generate_error_msg(self, slot_position, index, error: SIMError): msg = "slot({}) SIM({}), error:{}".format(slot_position, index + 1, error.value) self._log_sender.send_log(msg, subject=self.TAG, type=LOG_ERROR) return msg def get_raw_phone_number(self, slot_position): for index, ser in enumerate(self._serial_list): print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port)) if not self._select_sim_storage(ser): print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR)) continue msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser) if "Unfortunately" in str(msg): print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED)) continue elif "CME ERROR" in str(msg): print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR)) continue elif len(msg) == 0: print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT)) continue # find phone number match = re.search(r'33\d{9}', str(msg)) phone_number = match.group(0) print("phone is " + phone_number) cmd = "AT+CCID\r" response = str(self._send_command(cmd, ser)) ccid_group = re.search("[0-9F]+", response) ccid = ccid_group.group(0) if phone_number: self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid)) # write the number to sim card's phonebook cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r' self._send_command(cmd, ser, wait_time_in_s=2) self.get_own_number(ser) def get_own_number(self, ser: Serial): print("saved phone number: " + str(self._send_command(f'AT+CPBR={self.phone_number_position}\r', ser))) def _select_sim_storage(self, ser) -> bool: # use SIM Card storage cmd_sm = "AT+CPBS=\"SM\"\r" result = self._send_command(cmd_sm, ser) if "ERROR" in str(result): return False else: return True def _send_command(self, cmd: str, ser, wait_time_in_s: int = 0) -> bytes: ser.write(cmd.encode()) msg = ser.read(100) count = 0 while 'OK' not in str(msg) and count < wait_time_in_s: time.sleep(1) count = count + 1 msg = ser.read(100) # msg = ser.read(100) print(msg) return msg def _execut_USSD_cmd(self, cmd, ser) -> bytes: # the timeout for ussd command can be 120 s in mac return self._send_command(cmd, ser, 120)