Files
appointment_tool/modems/ModemPool.py
T

126 lines
5.3 KiB
Python

import re
import time
import serial
from gsmmodem import GsmModem
from serial import Serial
from definitions import BAUDRATE
from error.SIMError import SIMError
from logs.LogSender import LOG_APPOINTMENT_SUCCESS, SUBJECT_SIM_INFO
from params import firebase_store_manager, oracle_log_sender
from pojo.SimInfoPojo import SimInfoPojo
from pojo.serial_modem import SerialModem
from utils.excel_reader import ExcelHelper
from utils.operator import check_operator, Operator
class ModemPool:
phone_number_position = 10
TAG = "ModemPool"
def __init__(self, port_list: list):
self._port_list = port_list
self._serial_list = []
self._excel_helper = ExcelHelper()
self._log_sender = oracle_log_sender
self._db_manager = firebase_store_manager
for port in self._port_list:
ser = serial.Serial(port, BAUDRATE, timeout=1)
self._serial_list.append(ser)
def reset_all_modems(self):
print("will reset modem pool")
for ser in self._serial_list:
# may encontre exception here, multi-access to serial port
time.sleep(2)
self._send_command("AT+CFUN=1,1\r", ser)
# wait for 20 second, so that the modem can init all the sims
time.sleep(20)
def _generate_error_msg(self, slot_position, index, error: SIMError):
msg = "slot({}) SIM({}), error:{}".format(slot_position, index + 1,
error.value)
self._log_sender.send_log(msg, source=self.TAG, subject=SUBJECT_SIM_INFO, type=error.value)
return msg
def get_raw_phone_number(self, slot_position):
for index, ser in enumerate(self._serial_list):
sim_position = index + 1
position = (slot_position - 1) * len(self._port_list) + sim_position
# unlock sim
unlock_cmd = 'AT+CPIN="{0}\r"'.format("0000")
self._send_command(unlock_cmd, ser, 10)
cmd = "AT+CCID\r"
response = str(self._send_command(cmd, ser))
ccid_group = re.search("[0-9F]+", response)
ccid = ccid_group.group(0)
operator = check_operator(ccid)
if operator == Operator.SFR or operator == Operator.CHINA_TELECOM:
contacts = self._excel_helper.read_contacts()
contact = [contact for contact in contacts if
contact.ccid.replace("F", "") == ccid.replace("F", "")]
if len(contact) > 0:
phone_number = contact[0].phone
self._db_manager.save_sim_info(SimInfoPojo(phone=str(phone_number), ccid=ccid, position=position))
else:
error_msg = "slot({}),sim({})".format(slot_position, sim_position)
oracle_log_sender.send_contact_not_found(error_msg)
else:
print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port))
if not self._select_sim_storage(ser):
print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR))
continue
msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser)
if "Unfortunately" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED))
continue
elif "CME ERROR" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR))
continue
elif len(msg) == 0:
print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT))
continue
# find phone number
match = re.search(r'33\d{9}', str(msg))
phone_number = match.group(0)
print("phone is " + phone_number)
if phone_number:
self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid, position=position))
self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO,
type=LOG_APPOINTMENT_SUCCESS)
# write the number to sim card's phonebook
cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r'
self._send_command(cmd, ser, wait_time_in_s=2)
def _select_sim_storage(self, ser) -> bool:
# use SIM Card storage
cmd_sm = "AT+CPBS=\"SM\"\r"
result = self._send_command(cmd_sm, ser)
if "ERROR" in str(result):
return False
else:
return True
def _send_command(self, cmd: str, ser, wait_time_in_s: int = 0) -> bytes:
ser.write(cmd.encode())
msg = None
try:
msg = ser.read(100)
count = 0
while 'OK' not in str(msg) and count < wait_time_in_s:
time.sleep(1)
count = count + 1
msg = ser.read(100)
except Exception as exc:
print(exc)
print(msg)
return msg
def _execut_USSD_cmd(self, cmd, ser) -> bytes:
# the timeout for ussd command can be 120 s in mac
return self._send_command(cmd, ser, 120)