import logging import random import sys import time from typing import Union from gsmmodem import GsmModem import params from commandor_page import CommandorPage from logs.AppLogging import init_logger from modems.ModemPool import ModemPool from modems.card_pool import CardPool from params import MODEM_POOL_PORTS, CARD_POOL_PORT from pojo.serial_modem import SerialModem from utils.excel_reader import ExcelHelper OTP_TIMEOUT = 40 commandor = CommandorPage() thread_event = None current_gsm_modem = None card_pool = CardPool(CARD_POOL_PORT) def get_devices_ports() -> list: return MODEM_POOL_PORTS def create_modem_for_port(port: str) -> Union[SerialModem, None]: logger.info('Initializing modem... for ' + port) serial_modem = None try: modem = GsmModem(port) return SerialModem(modem=modem) except Exception as ext: logger.error(ext) return serial_modem def timeout_occurred(serial_modem: SerialModem): logger.info("will close timeout modem") serial_modem.modem.close() commandor.reset_air_plan_mode() def start_to_handle_sms(serial_modem: SerialModem): global current_gsm_modem current_gsm_modem = serial_modem.modem try: current_gsm_modem.deleteMultipleStoredSms(memory="SM") except Exception as error: print(error) serial_modem.modem.smsReceivedCallback = handle_sms serial_modem.modem.smsTextMode = False logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number)) listen_at = time.time() while True: time.sleep(2) # check whether timeout now = time.time() if (listen_at + OTP_TIMEOUT) < now: logger.info("time out for {}, switch to next contact".format(serial_modem.phone_number)) # save the contact in timeout timeout_occurred(serial_modem) current_gsm_modem.close() return def handle_sms(sms): logger.info( u'== SMS message received ==\nFrom: {0}\nTime: {1}\nMessage:\n{2}\n'.format(sms.number, sms.time, sms.text)) # extract the otp number params.oracle_log_sender.send_sms_reception_log(sms.number, sms.text) def init_modems() -> list: modems = [] for port in get_devices_ports(): serial_modem = create_modem_for_port(port) if serial_modem: modems.append(serial_modem) return modems def start_waiting_sms(): params.oracle_log_sender.send_wait_sms_log() slot_number = 1 slot_sum = 21 slot_list = list(range(slot_number, slot_sum + 1)) random.shuffle(slot_list) for i in slot_list: random.shuffle(params.MODEM_POOL_PORTS) card_pool.reset() logger.info("will switch to " + str(i)) card_pool.switch_to_slot(i) modem_pool = ModemPool(get_devices_ports()) modem_pool.reset_all_modems() modem_list = init_modems() # read the contact, and merge the 2 objects together excel_reader = ExcelHelper() contacts = excel_reader.read_contacts() for modem in modem_list: try: # get contact for current modem modem.get_ccid() contact = [contact for contact in contacts if contact.ccid.replace("F", "") == modem.ccid.replace("F", "")] if len(contact) > 0: modem.phone_number = contact[0].phone modem.contact = contact[0] start_to_handle_sms(modem) except Exception as error: print(error) continue if __name__ == '__main__': init_logger() logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) start_waiting_sms()