56 lines
1.7 KiB
Python
56 lines
1.7 KiB
Python
import logging
|
|
|
|
import serial
|
|
|
|
import params
|
|
from logs.LogSender import LOG_SUBJECT_EVENT, TYPE_EVENT_CHANGE_SLOT
|
|
|
|
BAUDRATE = 115200
|
|
|
|
|
|
class CardPool:
|
|
|
|
def __init__(self, port):
|
|
self.logger = logging.getLogger("CardPool")
|
|
self._serial = serial.Serial(port, BAUDRATE, timeout=1)
|
|
|
|
def _send_command(self, cmd: str) -> bytes:
|
|
print("send command {}".format(cmd))
|
|
self._serial.write(cmd.encode())
|
|
msg = self._serial.read(100)
|
|
self.logger.info(msg)
|
|
return msg
|
|
|
|
# info: after reset, we need to restart modem pool
|
|
def reset(self):
|
|
self._send_command("AT+NEXT00\r")
|
|
|
|
def switch_to_next(self):
|
|
self._send_command("AT+NEXT11\r")
|
|
|
|
def switch_to_slot(self, slot_number: int):
|
|
params.oracle_log_sender.send_log(msg="换到" + str(slot_number) + "行", source=LOG_SUBJECT_EVENT,
|
|
subject=LOG_SUBJECT_EVENT,
|
|
type=TYPE_EVENT_CHANGE_SLOT)
|
|
if slot_number < 10:
|
|
self._send_command("AT+SWIT00-000{}\r".format(slot_number))
|
|
else:
|
|
self._send_command("AT+SWIT00-00{}\r".format(slot_number))
|
|
|
|
# not work for the pool
|
|
def find_current_slot(self):
|
|
self._send_command("AT+USIM\r")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
card_pool = CardPool(params.CARD_POOL_PORT)
|
|
# print(card_pool.find_current_slot())
|
|
card_pool.reset()
|
|
# card_pool.switch_to_next()
|
|
# reset modem pool
|
|
# for port in get_devices_ports():
|
|
# ser = serial.Serial(port, BAUDRATE, timeout=1)
|
|
# send_command("AT+RESET\r", ser)
|
|
# ser.close()
|
|
# card_pool.switch_to_slot(12)
|