111 lines
4.4 KiB
Python
111 lines
4.4 KiB
Python
import re
|
|
import time
|
|
|
|
import serial
|
|
from serial import Serial
|
|
|
|
from error.SIMError import SIMError
|
|
from logs.LogSender import LOG_APPOINTMENT_SUCCESS, SUBJECT_SIM_INFO
|
|
from params import firebase_store_manager, oracle_log_sender
|
|
from pojo.SimInfoPojo import SimInfoPojo
|
|
from utils.excel_reader import ExcelHelper
|
|
|
|
|
|
class ModemPool:
|
|
BAUDRATE = 115200
|
|
phone_number_position = 10
|
|
TAG = "ModemPool"
|
|
|
|
def __init__(self, port_list: list):
|
|
self._port_list = port_list
|
|
self._serial_list = []
|
|
self._excel_helper = ExcelHelper()
|
|
self._log_sender = oracle_log_sender
|
|
self._db_manager = firebase_store_manager
|
|
|
|
for port in self._port_list:
|
|
ser = serial.Serial(port, self.BAUDRATE, timeout=1)
|
|
self._serial_list.append(ser)
|
|
|
|
def reset_all_modems(self):
|
|
print("will reset modem pool")
|
|
for ser in self._serial_list:
|
|
# may encontre exception here, multi-access to serial port
|
|
time.sleep(2)
|
|
self._send_command("AT+CFUN=1,1\r", ser)
|
|
# wait for 20 second, so that the modem can init all the sims
|
|
time.sleep(20)
|
|
|
|
def _generate_error_msg(self, slot_position, index, error: SIMError):
|
|
msg = "slot({}) SIM({}), error:{}".format(slot_position, index + 1,
|
|
error.value)
|
|
self._log_sender.send_log(msg, source=self.TAG, subject=SUBJECT_SIM_INFO, type=error.value)
|
|
return msg
|
|
|
|
def get_raw_phone_number(self, slot_position):
|
|
for index, ser in enumerate(self._serial_list):
|
|
print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port))
|
|
if not self._select_sim_storage(ser):
|
|
print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR))
|
|
continue
|
|
msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser)
|
|
if "Unfortunately" in str(msg):
|
|
print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED))
|
|
continue
|
|
elif "CME ERROR" in str(msg):
|
|
print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR))
|
|
continue
|
|
elif len(msg) == 0:
|
|
print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT))
|
|
continue
|
|
# find phone number
|
|
match = re.search(r'33\d{9}', str(msg))
|
|
phone_number = match.group(0)
|
|
print("phone is " + phone_number)
|
|
cmd = "AT+CCID\r"
|
|
response = str(self._send_command(cmd, ser))
|
|
ccid_group = re.search("[0-9F]+", response)
|
|
ccid = ccid_group.group(0)
|
|
sim_position = index + 1
|
|
position = (slot_position - 1) * 15 + sim_position
|
|
if phone_number:
|
|
self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid, position=position))
|
|
self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO,
|
|
type=LOG_APPOINTMENT_SUCCESS)
|
|
# write the number to sim card's phonebook
|
|
cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r'
|
|
self._send_command(cmd, ser, wait_time_in_s=2)
|
|
self.get_own_number(ser)
|
|
|
|
def get_own_number(self, ser: Serial):
|
|
print("saved phone number: " + str(self._send_command(f'AT+CPBR={self.phone_number_position}\r', ser)))
|
|
|
|
def _select_sim_storage(self, ser) -> bool:
|
|
# use SIM Card storage
|
|
cmd_sm = "AT+CPBS=\"SM\"\r"
|
|
result = self._send_command(cmd_sm, ser)
|
|
if "ERROR" in str(result):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def _send_command(self, cmd: str, ser, wait_time_in_s: int = 0) -> bytes:
|
|
ser.write(cmd.encode())
|
|
msg = None
|
|
try:
|
|
msg = ser.read(100)
|
|
count = 0
|
|
while 'OK' not in str(msg) and count < wait_time_in_s:
|
|
time.sleep(1)
|
|
count = count + 1
|
|
msg = ser.read(100)
|
|
except Exception as exc:
|
|
print(exc)
|
|
# msg = ser.read(100)
|
|
print(msg)
|
|
return msg
|
|
|
|
def _execut_USSD_cmd(self, cmd, ser) -> bytes:
|
|
# the timeout for ussd command can be 120 s in mac
|
|
return self._send_command(cmd, ser, 120)
|