Files
appointment_tool/modems/ModemPool.py
T
2022-04-15 15:54:54 +02:00

131 lines
5.6 KiB
Python

import re
import time
import serial
from definitions import BAUDRATE
from error.SIMError import SIMError
from logs.LogSender import LOG_APPOINTMENT_SUCCESS, SUBJECT_SIM_INFO
from params import firebase_store_manager, oracle_log_sender
from pojo.SimInfoPojo import SimInfoPojo
from utils.excel_reader import ExcelHelper
from utils.operator import check_operator, Operator
class ModemPool:
phone_number_position = 10
TAG = "ModemPool"
def __init__(self, port_list: list):
self._port_list = port_list
self._serial_list = []
self._excel_helper = ExcelHelper()
self._log_sender = oracle_log_sender
self._db_manager = firebase_store_manager
for port in self._port_list:
ser = serial.Serial(port, BAUDRATE, timeout=1)
self._serial_list.append(ser)
def reset_all_modems(self):
print("will reset modem pool")
for ser in self._serial_list:
# may encontre exception here, multi-access to serial port
time.sleep(2)
self._send_command("AT+CFUN=1,1\r", ser)
# wait for 20 second, so that the modem can init all the sims
time.sleep(20)
def _generate_error_msg(self, slot_position, index, error: SIMError):
msg = "slot({}) SIM({}), error:{}".format(slot_position, index + 1,
error.value)
self._log_sender.send_log(msg, source=self.TAG, subject=SUBJECT_SIM_INFO, type=error.value)
return msg
def get_raw_phone_number(self, slot_position):
for index, ser in enumerate(self._serial_list):
sim_position = index + 1
position = (slot_position - 1) * len(self._port_list) + sim_position
# unlock sim
unlock_cmd = 'AT+CPIN="{0}\r"'.format("0000")
self._send_command(unlock_cmd, ser, 10)
cmd = "AT+CCID\r"
response = str(self._send_command(cmd, ser))
ccid_group = re.search("[0-9F]+", response)
ccid = ccid_group.group(0)
operator = check_operator(ccid)
if operator == Operator.SFR or operator == Operator.CHINA_TELECOM:
contacts = self._excel_helper.read_contacts("/contact_all.xlsx")
contact = [contact for contact in contacts if
contact.ccid.replace("F", "") == ccid.replace("F", "")]
if len(contact) > 0:
phone_number = contact[0].phone
self._db_manager.save_sim_info(
SimInfoPojo(phone=str(phone_number), ccid=ccid, position=position, sim_position=sim_position,
slot_position=slot_position, operator=operator.value))
else:
error_msg = "slot({}),sim({})".format(slot_position, sim_position)
oracle_log_sender.send_contact_not_found(error_msg)
else:
print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port))
if not self._select_sim_storage(ser):
print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR))
continue
msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser)
if "Unfortunately" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED))
continue
elif "CME ERROR" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR))
continue
elif len(msg) == 0:
print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT))
continue
# find phone number
match = re.search(r'33\d{9}', str(msg))
phone_number = match.group(0)
print("phone is " + phone_number)
if phone_number:
self._db_manager.save_sim_info(
SimInfoPojo(phone=phone_number, ccid=ccid, position=position, slot_position=slot_position,
sim_position=sim_position, operator=operator.value))
self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO,
type=LOG_APPOINTMENT_SUCCESS)
# write the number to sim card's phonebook
# cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r'
# self._send_command(cmd, ser, wait_time_in_s=2)
def _select_sim_storage(self, ser) -> bool:
# use SIM Card storage
cmd_sm = "AT+CPBS=\"SM\"\r"
result = self._send_command(cmd_sm, ser)
if "ERROR" in str(result):
return False
else:
return True
def _send_command(self, cmd: str, ser, wait_time_in_s: int = 0) -> bytes:
ser.write(cmd.encode())
msg = None
try:
msg = ser.read(100)
count = 0
while 'OK' not in str(msg) and count < wait_time_in_s:
time.sleep(1)
count = count + 1
msg = ser.read(100)
except Exception as exc:
print(exc)
print(msg)
return msg
def _execut_USSD_cmd(self, cmd, ser) -> bytes:
# the timeout for ussd command can be 120 s in mac
return self._send_command(cmd, ser, 120)
def close(self):
for serial in self._serial_list:
serial.close()