import datetime import logging import re import sys import time from gsmmodem import GsmModem from commandor import Commandor from excel_reader import ExcelReader from pojo.serial_modem import SerialModem from utils.logging import init_logger BAUDRATE = 115200 OTP_TIMEOUT = 60 sms_received = False commandor = Commandor() def get_devices_ports() -> list: return ["/dev/tty.usbmodem121101", "/dev/tty.usbmodem121103", "/dev/tty.usbmodem121105", "/dev/tty.usbmodem121107", "/dev/tty.usbmodem121201", "/dev/tty.usbmodem121203", "/dev/tty.usbmodem121205", "/dev/tty.usbmodem121207", "/dev/tty.usbmodem121301", # "/dev/tty.usbmodem1121303", "/dev/tty.usbmodem121305", "/dev/tty.usbmodem121307", "/dev/tty.usbmodem121401", "/dev/tty.usbmodem121403", "/dev/tty.usbmodem121405", "/dev/tty.usbmodem121407"] def has_sim(ser) -> bool: # check pin cmd_check_pin = "AT+cpin?\r" msg = send_command(cmd_check_pin, ser) if b'OK' in msg: return True else: return False def send_command(cmd: str, ser) -> bytes: print("send command {}".format(cmd)) ser.write(cmd.encode()) msg = ser.read(100) logger.info(msg) return msg def get_phone_number(): cmd = "AT+CNUM\r" send_command(cmd) def create_modem_for_port(port: str) -> SerialModem: logger.info('Initializing modem... for ' + port) # Uncomment the following line to see what the modem is doing: init_logger() modem = GsmModem(port, BAUDRATE) modem.connect('0000') number = modem.ownNumber logger.info("The SIM card phone number is:") logger.info(number) cmd = "AT+CCID\r" response = modem.write(cmd, True) ccid = response[0].split(" ")[1].replace("\"", "") logger.info("The SIM card ccid is:" + ccid) serial_modem = SerialModem(modem=modem, ccid=ccid) return serial_modem def start_to_handle_sms(serial_modem: SerialModem): serial_modem.modem.smsReceivedCallback = handle_sms global sms_received sms_received = False serial_modem.modem.smsTextMode = False logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number)) listen_at = time.time() while not sms_received: time.sleep(2) # check whether timeout now = time.time() if (listen_at + OTP_TIMEOUT) < now: logger.info("time out for {}, switch to next contact".format(serial_modem.phone_number)) return return def handle_sms(sms): logger.info( u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text)) # extract the otp number logger.info("try to extract the otp") match = re.search(r'\d{6,8}', sms.text) otp = match.group(0) logger.info("otp is " + otp) commandor.send_otp(otp) time.sleep(20) global sms_received sms_received = True def init_modems() -> list: modems = [] for port in get_devices_ports(): modems.append(create_modem_for_port(port)) # read the contact, and contact the 2 objects together excel_reader = ExcelReader() contacts = excel_reader.read_contacts() for modem in modems: contact = [contact for contact in contacts if contact.ccid == modem.ccid][0] modem.phone_number = contact.phone modem.contact = contact return modems if __name__ == '__main__': # enable verbose logs for all port init_logger() logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) # create modems for all the available port modem_list = init_modems() # create listeners for chaque modem for modem in modem_list: commandor.start_page(modem.contact) start_to_handle_sms(modem)