import logging import serial import params from logs.LogSender import LOG_SUBJECT_EVENT, TYPE_EVENT_CHANGE_SLOT BAUDRATE = 115200 class CardPool: def __init__(self, port_list): self.logger = logging.getLogger("CardPool") self._serial_list = [] for port in port_list: self._serial_list.append(serial.Serial(port, BAUDRATE, timeout=1)) def _send_command(self, serial, cmd: str) -> bytes: print("send command {}".format(cmd)) serial.write(cmd.encode()) msg = serial.read(100) self.logger.info(msg) return msg # info: after reset, we need to restart modem pool def reset(self): for serial in self._serial_list: self._send_command(serial, "AT+NEXT00\r") def switch_to_next(self): for serial in self._serial_list: self._send_command(serial, "AT+NEXT11\r") def switch_to_slot(self, slot_number: int): params.oracle_log_sender.send_log(msg="换到" + str(slot_number) + "行", source=LOG_SUBJECT_EVENT, subject=LOG_SUBJECT_EVENT, type=TYPE_EVENT_CHANGE_SLOT) if slot_number < 10: for serial in self._serial_list: self._send_command(serial, "AT+SWIT00-000{}\r".format(slot_number)) else: for serial in self._serial_list: self._send_command(serial, "AT+SWIT00-00{}\r".format(slot_number)) # not work for the pool def find_current_slot(self): for serial in self._serial_list: self._send_command(serial, "AT+USIM\r")