Files
appointment_tool/main.py
T
2022-02-21 15:26:50 +01:00

130 lines
3.7 KiB
Python

import datetime
import logging
import re
import sys
import time
from gsmmodem import GsmModem
from commandor import Commandor
from excel_reader import ExcelReader
from pojo.serial_modem import SerialModem
from utils.logging import init_logger
BAUDRATE = 115200
OTP_TIMEOUT = 120
sms_received = False
def get_devices_ports() -> list:
return ["/dev/tty.usbmodem121101",
"/dev/tty.usbmodem121103",
"/dev/tty.usbmodem121105",
"/dev/tty.usbmodem121107",
"/dev/tty.usbmodem121201",
"/dev/tty.usbmodem121203",
"/dev/tty.usbmodem121205",
"/dev/tty.usbmodem121207",
"/dev/tty.usbmodem121301",
# "/dev/tty.usbmodem1121303",
"/dev/tty.usbmodem121305",
"/dev/tty.usbmodem121307",
"/dev/tty.usbmodem121401",
"/dev/tty.usbmodem121403",
"/dev/tty.usbmodem121405",
"/dev/tty.usbmodem121407"]
def has_sim(ser) -> bool:
# check pin
cmd_check_pin = "AT+cpin?\r"
msg = send_command(cmd_check_pin, ser)
if b'OK' in msg:
return True
else:
return False
def send_command(cmd: str, ser) -> bytes:
print("send command {}".format(cmd))
ser.write(cmd.encode())
msg = ser.read(100)
logger.info(msg)
return msg
def get_phone_number():
cmd = "AT+CNUM\r"
send_command(cmd)
def create_modem_for_port(port: str) -> SerialModem:
logger.info('Initializing modem... for ' + port)
# Uncomment the following line to see what the modem is doing:
init_logger()
modem = GsmModem(port, BAUDRATE)
modem.connect('0000')
number = modem.ownNumber
logger.info("The SIM card phone number is:")
logger.info(number)
cmd = "AT+CCID\r"
response = modem.write(cmd, True)
ccid = response[0].split(" ")[1].replace("\"", "")
logger.info("The SIM card ccid is:" + ccid)
serial_modem = SerialModem(modem=modem, ccid=ccid)
return serial_modem
def start_to_handle_sms(serial_modem: SerialModem):
serial_modem.modem.smsReceivedCallback = handle_sms
global sms_received
sms_received = False
serial_modem.modem.smsTextMode = False
logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number))
listen_at = time.time()
while not sms_received:
time.sleep(2)
# check whether timeout
now = time.time()
if (listen_at + OTP_TIMEOUT) < now:
logger.info("time out for {}, switch to next contact".format(serial_modem.phone_number))
return
return
def handle_sms(sms):
logger.info(u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text))
global sms_received
sms_received = True
# extract the otp number
logger.info("try to extract the otp")
match = re.search(r'\d{6,8}', sms.text)
logger.info("otp is " + match.group(0))
def init_modems() -> list:
modems = []
for port in get_devices_ports():
modems.append(create_modem_for_port(port))
# read the contact, and contact the 2 objects together
excel_reader = ExcelReader()
contacts = excel_reader.read_contacts()
for modem in modems:
contact = [contact for contact in contacts if contact.ccid == modem.ccid][0]
modem.phone_number = contact.phone
modem.contact = contact
return modems
if __name__ == '__main__':
# enable verbose logs for all port
init_logger()
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
# create modems for all the available port
modem_list = init_modems()
# create listeners for chaque modem
for modem in modem_list:
start_to_handle_sms(modem)