can read otp

This commit is contained in:
2022-02-21 15:26:50 +01:00
parent 921457b934
commit 4877fdcdde
5 changed files with 90 additions and 83 deletions
+1
View File
@@ -1,3 +1,4 @@
/.idea/ /.idea/
__pycache__ __pycache__
pojo/__pycache__
.~contact.xlsx .~contact.xlsx
+3 -3
View File
@@ -7,7 +7,7 @@ from pojo.contact_pojo import ContactPojo
class ExcelReader: class ExcelReader:
# read the contact list from the exel file # read the contact list from the exel file
def read_file(self) -> list: def read_contacts(self) -> list:
contact_list_in_json = pandas.read_excel(r'./contact.xlsx').to_json(orient='records') contact_list_in_json = pandas.read_excel(r'./contact.xlsx').to_json(orient='records')
contact_dict_list = json.loads(contact_list_in_json) contact_dict_list = json.loads(contact_list_in_json)
contact_list = [] contact_list = []
@@ -19,12 +19,12 @@ class ExcelReader:
last_name=last_name, last_name=last_name,
first_name=first_name, first_name=first_name,
ccid=contact_dict['ccid'], ccid=contact_dict['ccid'],
passport_number=contact_dict['passport']) passport_number=contact_dict['passport'], mail=contact_dict['email'])
contact_list.append(contact) contact_list.append(contact)
return contact_list return contact_list
if __name__ == '__main__': if __name__ == '__main__':
reader = ExcelReader() reader = ExcelReader()
data = reader.read_file() data = reader.read_contacts()
print(data) print(data)
+78 -76
View File
@@ -1,35 +1,38 @@
import datetime
import logging import logging
import re
import sys
import time
import serial
from gsmmodem import GsmModem from gsmmodem import GsmModem
from gsmmodem.modem import SentSms
from commandor import Commandor
from excel_reader import ExcelReader
from pojo.serial_modem import SerialModem
from utils.logging import init_logger from utils.logging import init_logger
PORT = "/dev/tty.usbmodem11101"
BAUDRATE = 115200 BAUDRATE = 115200
OTP_TIMEOUT = 120
sms_received = False
# ser = serial.Serial(PORT, BAUDRATE, timeout=1)
def get_devices_ports() -> list: def get_devices_ports() -> list:
return ["/dev/tty.usbmodem1121101", return ["/dev/tty.usbmodem121101",
"/dev/tty.usbmodem1121103", "/dev/tty.usbmodem121103",
"/dev/tty.usbmodem1121105", "/dev/tty.usbmodem121105",
"/dev/tty.usbmodem1121107", "/dev/tty.usbmodem121107",
"/dev/tty.usbmodem1121201", "/dev/tty.usbmodem121201",
"/dev/tty.usbmodem1121203", "/dev/tty.usbmodem121203",
"/dev/tty.usbmodem1121205", "/dev/tty.usbmodem121205",
"/dev/tty.usbmodem1121207", "/dev/tty.usbmodem121207",
"/dev/tty.usbmodem1121301", "/dev/tty.usbmodem121301",
# "/dev/tty.usbmodem1121303", # "/dev/tty.usbmodem1121303",
"/dev/tty.usbmodem1121305", "/dev/tty.usbmodem121305",
"/dev/tty.usbmodem1121307", "/dev/tty.usbmodem121307",
"/dev/tty.usbmodem1121401", "/dev/tty.usbmodem121401",
"/dev/tty.usbmodem1121403", "/dev/tty.usbmodem121403",
"/dev/tty.usbmodem1121405", "/dev/tty.usbmodem121405",
"/dev/tty.usbmodem1121407"] "/dev/tty.usbmodem121407"]
def has_sim(ser) -> bool: def has_sim(ser) -> bool:
@@ -46,7 +49,7 @@ def send_command(cmd: str, ser) -> bytes:
print("send command {}".format(cmd)) print("send command {}".format(cmd))
ser.write(cmd.encode()) ser.write(cmd.encode())
msg = ser.read(100) msg = ser.read(100)
print(msg) logger.info(msg)
return msg return msg
@@ -55,73 +58,72 @@ def get_phone_number():
send_command(cmd) send_command(cmd)
def get_sim_ccid(): def create_modem_for_port(port: str) -> SerialModem:
cmd = "AT+CCID\r" logger.info('Initializing modem... for ' + port)
send_command(cmd)
def send_sms(sms_destination: str, msg: str):
print('Initializing modem...')
modem = GsmModem(PORT, BAUDRATE)
modem.connect('0000')
# modem.waitForNetworkCoverage(10)
print('Sending SMS to: {0}'.format(sms_destination))
response = modem.sendSms(sms_destination, msg, True)
if type(response) == SentSms:
print('SMS Delivered.')
else:
print('SMS Could not be sent')
modem.close()
def create_modem_for_port(port: str) -> GsmModem:
print('Initializing modem... for ' + port)
# Uncomment the following line to see what the modem is doing: # Uncomment the following line to see what the modem is doing:
init_logger() init_logger()
modem = GsmModem(port, BAUDRATE) modem = GsmModem(port, BAUDRATE)
modem.connect('0000') modem.connect('0000')
number = modem.ownNumber number = modem.ownNumber
print("The SIM card phone number is:") logger.info("The SIM card phone number is:")
print(number) logger.info(number)
cmd = "AT+CCID\r" cmd = "AT+CCID\r"
ccid = modem.write(cmd, True) response = modem.write(cmd, True)
print("The SIM card ccid is:" + " ".join(ccid)) ccid = response[0].split(" ")[1].replace("\"", "")
return modem logger.info("The SIM card ccid is:" + ccid)
serial_modem = SerialModem(modem=modem, ccid=ccid)
return serial_modem
def start_to_handle_sms(modem: GsmModem): def start_to_handle_sms(serial_modem: SerialModem):
modem.smsReceivedCallback = handle_sms serial_modem.modem.smsReceivedCallback = handle_sms
# modem = GsmModem(PORT, BAUDRATE, smsReceivedCallbackFunc=handle_sms) global sms_received
modem.smsTextMode = False sms_received = False
# modem.connect('0000') serial_modem.modem.smsTextMode = False
print('Waiting for SMS message...') logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number))
try: listen_at = time.time()
modem.rxThread.join( while not sms_received:
2 ** 31) # Specify a (huge) timeout so that it essentially blocks indefinitely, but still receives CTRL+C interrupt signal time.sleep(2)
finally: # check whether timeout
modem.close() now = time.time()
if (listen_at + OTP_TIMEOUT) < now:
logger.info("time out for {}, switch to next contact".format(serial_modem.phone_number))
return
return
def handle_sms(sms): def handle_sms(sms):
print(u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text)) logger.info(u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text))
global sms_received
sms_received = True
# extract the otp number
logger.info("try to extract the otp")
match = re.search(r'\d{6,8}', sms.text)
logger.info("otp is " + match.group(0))
def init_modems() -> list:
modems = []
for port in get_devices_ports():
modems.append(create_modem_for_port(port))
# read the contact, and contact the 2 objects together
excel_reader = ExcelReader()
contacts = excel_reader.read_contacts()
for modem in modems:
contact = [contact for contact in contacts if contact.ccid == modem.ccid][0]
modem.phone_number = contact.phone
modem.contact = contact
return modems
if __name__ == '__main__': if __name__ == '__main__':
# enable verbose logs for all port # enable verbose logs for all port
init_logger()
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
# create modems for all the available port # create modems for all the available port
modem_list = [] modem_list = init_modems()
for port in get_devices_ports(): # create listeners for chaque modem
modem_list.append(create_modem_for_port(port)) for modem in modem_list:
start_to_handle_sms(modem)
# cmd = "AT+CMEE=2\r"
# send_command(cmd)
# if has_sim():
# # send_sms("+33649614591", "Modem pool test")
# modem = show_own_phone_number()
# get_sim_ccid()
# # start_to_handle_sms(modem)
# else:
# print("no cart sim")
+7 -3
View File
@@ -1,13 +1,17 @@
from dataclasses import dataclass
from gsmmodem import GsmModem from gsmmodem import GsmModem
from pojo.contact_pojo import ContactPojo from pojo.contact_pojo import ContactPojo
@dataclass
class SerialModem(): class SerialModem():
ccid: str ccid: str
phone_number: str phone_number = None
modem: GsmModem modem: GsmModem
contact: ContactPojo contact = None
def __init__(self, modem: GsmModem): def __init__(self, modem: GsmModem, ccid: str = None):
self.modem = modem self.modem = modem
self.ccid = ccid
+1 -1
View File
@@ -6,4 +6,4 @@ def init_logger():
filemode='a', filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%D:%H:%M:%S', datefmt='%D:%H:%M:%S',
level=logging.DEBUG) level=logging.INFO)