diff --git a/gsmmodem/__init__.py b/gsmmodem/__init__.py deleted file mode 100644 index 9b65886..0000000 --- a/gsmmodem/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -""" Package that allows easy control of an attached GSM modem - -The main class for controlling a modem is GsmModem, which can be imported -directly from this module. - -Other important and useful classes are: -gsmmodem.modem.IncomingCall: wraps an incoming call and passed to the incoming call hanndler callback function -gsmmodem.modem.ReceivedSms: wraps a received SMS message and passed to the sms received hanndler callback function -gsmmodem.modem.SentSms: returned when sending SMS messages; used for tracking the status of the SMS message - -All python-gsmmodem-specific exceptions are defined in the gsmmodem.modem.exceptions package. - -@author: Francois Aucamp -@license: LGPLv3+ -""" - -from .modem import GsmModem diff --git a/gsmmodem/compat.py b/gsmmodem/compat.py deleted file mode 100644 index 46d53ea..0000000 --- a/gsmmodem/compat.py +++ /dev/null @@ -1,22 +0,0 @@ -""" Contains monkey-patched equivalents for a few commonly-used Python 2.7-and-higher functions. -Used to provide backwards-compatibility with Python 2.6 -""" -import sys -if sys.version_info[0] == 2 and sys.version_info[1] < 7: - import threading - - # threading.Event.wait() always returns None in Python < 2.7 so we need to patch it - if hasattr(threading, '_Event'): # threading.Event is a function that return threading._Event - # This is heavily Python-implementation-specific, so patch where we can, otherwise leave it - def wrapWait(func): - def newWait(self, timeout=None): - func(self, timeout) - return self.is_set() - return newWait - threading._Event.wait = wrapWait(threading._Event.wait) - else: - raise ImportError('Could not patch this version of Python 2.{0} for compatibility with python-gsmmodem.'.format(sys.version_info[1])) -if sys.version_info[0] == 2: - str = str -else: - str = lambda x: x \ No newline at end of file diff --git a/gsmmodem/exceptions.py b/gsmmodem/exceptions.py deleted file mode 100644 index 6882524..0000000 --- a/gsmmodem/exceptions.py +++ /dev/null @@ -1,134 +0,0 @@ -""" Module defines exceptions used by gsmmodem """ - -class GsmModemException(Exception): - """ Base exception raised for error conditions when interacting with the GSM modem """ - - -class TimeoutException(GsmModemException): - """ Raised when a write command times out """ - - def __init__(self, data=None): - """ @param data: Any data that was read was read before timeout occurred (if applicable) """ - super(TimeoutException, self).__init__(data) - self.data = data - - -class InvalidStateException(GsmModemException): - """ Raised when an API method call is invoked on an object that is in an incorrect state """ - - -class InterruptedException(InvalidStateException): - """ Raised when execution of an AT command is interrupt by a state change. - May contain another exception that was the cause of the interruption """ - - def __init__(self, message, cause=None): - """ @param cause: the exception that caused this interruption (usually a CmeError) """ - super(InterruptedException, self).__init__(message) - self.cause = cause - - -class CommandError(GsmModemException): - """ Raised if the modem returns an error in response to an AT command - - May optionally include an error type (CME or CMS) and -code (error-specific). - """ - - _description = '' - - def __init__(self, command=None, type=None, code=None): - self.command = command - self.type = type - self.code = code - if type != None and code != None: - super(CommandError, self).__init__('{0} {1}{2}'.format(type, code, ' ({0})'.format(self._description) if len(self._description) > 0 else '')) - elif command != None: - super(CommandError, self).__init__(command) - else: - super(CommandError, self).__init__() - - -class CmeError(CommandError): - """ ME error result code : +CME ERROR: - - Issued in response to an AT command - """ - - def __new__(cls, *args, **kwargs): - # Return a specialized version of this class if possible - if len(args) >= 2: - code = args[1] - if code == 11: - return PinRequiredError(args[0]) - elif code == 16: - return IncorrectPinError(args[0]) - elif code == 12: - return PukRequiredError(args[0]) - return super(CmeError, cls).__new__(cls, *args, **kwargs) - - def __init__(self, command, code): - super(CmeError, self).__init__(command, 'CME', code) - - -class SecurityException(CmeError): - """ Security-related CME error """ - - def __init__(self, command, code): - super(SecurityException, self).__init__(command, code) - - -class PinRequiredError(SecurityException): - """ Raised if an operation failed because the SIM card's PIN has not been entered """ - - _description = 'SIM card PIN is required' - - def __init__(self, command, code=11): - super(PinRequiredError, self).__init__(command, code) - - -class IncorrectPinError(SecurityException): - """ Raised if an incorrect PIN is entered """ - - _description = 'Incorrect PIN entered' - - def __init__(self, command, code=16): - super(IncorrectPinError, self).__init__(command, code) - - -class PukRequiredError(SecurityException): - """ Raised an operation failed because the SIM card's PUK is required (SIM locked) """ - - _description = "PUK required (SIM locked)" - - def __init__(self, command, code=12): - super(PukRequiredError, self).__init__(command, code) - - -class CmsError(CommandError): - """ Message service failure result code: +CMS ERROR : - - Issued in response to an AT command - """ - - def __new__(cls, *args, **kwargs): - # Return a specialized version of this class if possible - if len(args) >= 2: - code = args[1] - if code == 330: - return SmscNumberUnknownError(args[0]) - return super(CmsError, cls).__new__(cls, *args, **kwargs) - - def __init__(self, command, code): - super(CmsError, self).__init__(command, 'CMS', code) - - -class SmscNumberUnknownError(CmsError): - """ Raised if the SMSC (service centre) address is missing when trying to send an SMS message """ - - _description = 'SMSC number not set' - - def __init__(self, command, code=330): - super(SmscNumberUnknownError, self).__init__(command, code) - - -class EncodingError(GsmModemException): - """ Raised if a decoding- or encoding operation failed """ diff --git a/gsmmodem/gprs.py b/gsmmodem/gprs.py deleted file mode 100644 index af164f4..0000000 --- a/gsmmodem/gprs.py +++ /dev/null @@ -1,96 +0,0 @@ -# -*- coding: utf8 -*- - -""" GPRS/Data-specific classes - -BRANCH: mms - -PLEASE NOTE: *Everything* in this file (PdpContext, GprsModem class, etc) is experimental. -This is NOT meant to be used in production in any way; the API is completely unstable, -no unit tests will be written for this in the forseeable future, and stuff may generally -break and cause riots. Please do not file bug reports against this branch unless you -have a patch to go along with it, but even then: remember that this entire "mms" branch -is exploratory; I simply want to see what the possibilities are with it. - -Use the "main" branch, and the GsmModem class if you want to build normal applications. -""" - -import re - -from .util import allLinesMatchingPattern -from .modem import GsmModem - -class PdpContext(object): - """ Packet Data Protocol (PDP) context parameter values """ - def __init__(self, cid, pdpType, apn, pdpAddress=None, dataCompression=0, headerCompression=0): - """ Construct a new Packet Data Protocol context - - @param cid: PDP Context Identifier - specifies a particular PDP context definition - @type cid: int - @param pdpType: the type of packet data protocol (IP, PPP, IPV6, etc) - @type pdpType: str - @param apn: Access Point Name; logical name used to select the GGSN or external packet data network - @type apn: str - @param pdpAddress: identifies the MT in the address space applicable to the PDP. If None, a dynamic address may be requested. - @type pdpAddress: str - @param dataCompression: PDP data compression; 0 == off, 1 == on - @type dataCompression: int - @param headerCompression: PDP header compression; 0 == off, 1 == on - @type headerCompression: int - """ - self.cid = cid - self.pdpType = pdpType - self.apn = apn - self.pdpAddress = pdpAddress - self.dataCompression = dataCompression - self.headerCompression = headerCompression - - -class GprsModem(GsmModem): - """ EXPERIMENTAL: Specialized version of GsmModem that includes GPRS/data-specific commands """ - - @property - def pdpContexts(self): - """ Currently-defined Packet Data Protocol (PDP) context list - - PDP paramter values returned include PDP type (IP, IPV6, PPP, X.25 etc), APN, - data compression, header compression, etc. - - @return: a list of currently-defined PDP contexts - """ - result = [] - cgdContResult = self.write('AT+CGDCONT?') - matches = allLinesMatchingPattern(re.compile(r'^\+CGDCONT:\s*(\d+),"([^"]+)","([^"]+)","([^"]+)",(\d+),(\d+)'), cgdContResult) - for cgdContMatch in matches: - cid, pdpType, apn, pdpAddress, dataCompression, headerCompression = cgdContMatch.groups() - pdpContext = PdpContext(cid, pdpType, apn, pdpAddress, dataCompression, headerCompression) - result.append(pdpContext) - return result - - @property - def defaultPdpContext(self): - """ @return: the default PDP context, or None if not defined """ - pdpContexts = self.pdpContexts - return pdpContexts[0] if len(pdpContexts) > 0 else None - @defaultPdpContext.setter - def defaultPdpContext(self, pdpContext): - """ Set the default PDP context (or clear it by setting it to None) """ - self.write('AT+CGDCONT=,"{0}","{1}","{2}",{3},{4}'.format(pdpContext.pdpType, pdpContext.apn, pdpContext.pdpAddress or '', pdpContext.dataCompression, pdpContext.headerCompression)) - - def definePdpContext(self, pdpContext): - """ Define a new Packet Data Protocol context, or overwrite an existing one - - @param pdpContext: The PDP context to define - @type pdpContext: gsmmodem.gprs.PdpContext - """ - self.write('AT+CGDCONT={0},"{1}","{2}","{3}",{4},{5}'.format(pdpContext.cid or '', pdpContext.pdpType, pdpContext.apn, pdpContext.pdpAddress or '', pdpContext.dataCompression, pdpContext.headerCompression)) - - def initDataConnection(self, pdpCid=1): - """ Initializes a packet data (GPRS) connection using the specified PDP Context ID """ - # From this point on, we don't want the read thread interfering - #self.log.debug('Stopping read thread') - #self.alive = False - #self.rxThread.join() - self.log.debug('Init data connection') - self.write('ATD*99#', expectedResponseTermSeq="CONNECT\r") - self.log.debug('Data connection open; ready for PPP comms') - # From here on we use PPP to communicate with the network diff --git a/gsmmodem/models/Call.py b/gsmmodem/models/Call.py deleted file mode 100644 index 757f9e2..0000000 --- a/gsmmodem/models/Call.py +++ /dev/null @@ -1,80 +0,0 @@ -import weakref - -from gsmmodem.exceptions import CmeError, InterruptedException, InvalidStateException - - -class Call(object): - """ A voice call """ - - DTMF_COMMAND_BASE = '+VTS=' - dtmfSupport = False # Indicates whether or not DTMF tones can be sent in calls - - def __init__(self, gsmModem, callId, callType, number, callStatusUpdateCallbackFunc=None): - """ - :param gsmModem: GsmModem instance that created this object - :param number: The number that is being called - """ - self._gsmModem = weakref.proxy(gsmModem) - self._callStatusUpdateCallbackFunc = callStatusUpdateCallbackFunc - # Unique ID of this call - self.id = callId - # Call type (VOICE == 0, etc) - self.type = callType - # The remote number of this call (destination or origin) - self.number = number - # Flag indicating whether the call has been answered or not (backing field for "answered" property) - self._answered = False - # Flag indicating whether or not the call is active - # (meaning it may be ringing or answered, but not ended because of a hangup event) - self.active = True - - @property - def answered(self): - return self._answered - - @answered.setter - def answered(self, answered): - self._answered = answered - if self._callStatusUpdateCallbackFunc: - self._callStatusUpdateCallbackFunc(self) - - def sendDtmfTone(self, tones): - """ Send one or more DTMF tones to the remote party (only allowed for an answered call) - - Note: this is highly device-dependent, and might not work - - :param digits: A str containining one or more DTMF tones to play, e.g. "3" or "\*123#" - - :raise CommandError: if the command failed/is not supported - :raise InvalidStateException: if the call has not been answered, or is ended while the command is still executing - """ - if self.answered: - dtmfCommandBase = self.DTMF_COMMAND_BASE.format(cid=self.id) - toneLen = len(tones) - for tone in list(tones): - try: - self._gsmModem.write('AT{0}{1}'.format(dtmfCommandBase, tone), timeout=(5 + toneLen)) - - except CmeError as e: - if e.code == 30: - # No network service - can happen if call is ended during DTMF transmission (but also if DTMF is sent immediately after call is answered) - raise InterruptedException('No network service', e) - elif e.code == 3: - # Operation not allowed - can happen if call is ended during DTMF transmission - raise InterruptedException('Operation not allowed', e) - else: - raise e - else: - raise InvalidStateException('Call is not active (it has not yet been answered, or it has ended).') - - def hangup(self): - """ End the phone call. - - Does nothing if the call is already inactive. - """ - if self.active: - self._gsmModem.write('ATH') - self.answered = False - self.active = False - if self.id in self._gsmModem.activeCalls: - del self._gsmModem.activeCalls[self.id] diff --git a/gsmmodem/models/IncomingCall.py b/gsmmodem/models/IncomingCall.py deleted file mode 100644 index 917ef9e..0000000 --- a/gsmmodem/models/IncomingCall.py +++ /dev/null @@ -1,40 +0,0 @@ -from gsmmodem.modem import Call - - -class IncomingCall(Call): - - CALL_TYPE_MAP = {'VOICE': 0} - - """ Represents an incoming call, conveniently allowing access to call meta information and -control """ - def __init__(self, gsmModem, number, ton, callerName, callId, callType): - """ - :param gsmModem: GsmModem instance that created this object - :param number: Caller number - :param ton: TON (type of number/address) in integer format - :param callType: Type of the incoming call (VOICE, FAX, DATA, etc) - """ - if callType in self.CALL_TYPE_MAP: - callType = self.CALL_TYPE_MAP[callType] - super(IncomingCall, self).__init__(gsmModem, callId, callType, number) - # Type attribute of the incoming call - self.ton = ton - self.callerName = callerName - # Flag indicating whether the call is ringing or not - self.ringing = True - # Amount of times this call has rung (before answer/hangup) - self.ringCount = 1 - - def answer(self): - """ Answer the phone call. - :return: self (for chaining method calls) - """ - if self.ringing: - self._gsmModem.write('ATA') - self.ringing = False - self.answered = True - return self - - def hangup(self): - """ End the phone call. """ - self.ringing = False - super(IncomingCall, self).hangup() \ No newline at end of file diff --git a/gsmmodem/models/ReceivedSms.py b/gsmmodem/models/ReceivedSms.py deleted file mode 100644 index 60edd23..0000000 --- a/gsmmodem/models/ReceivedSms.py +++ /dev/null @@ -1,27 +0,0 @@ -import weakref - -from gsmmodem.models.Sms import Sms - - -class ReceivedSms(Sms): - """ An SMS message that has been received (MT) """ - - def __init__(self, gsmModem, status, number, time, text, smsc=None, udh=[], index=None): - super(ReceivedSms, self).__init__(number, text, smsc) - self._gsmModem = weakref.proxy(gsmModem) - self.status = status - self.time = time - self.udh = udh - self.index = index - - def reply(self, message): - """ Convenience method that sends a reply SMS to the sender of this message """ - return self._gsmModem.sendSms(self.number, message) - - def sendSms(self, dnumber, message): - """ Convenience method that sends a SMS to someone else """ - return self._gsmModem.sendSms(dnumber, message) - - def getModem(self): - """ Convenience method that returns the gsm modem instance """ - return self._gsmModem diff --git a/gsmmodem/models/SentSms.py b/gsmmodem/models/SentSms.py deleted file mode 100644 index ff032c8..0000000 --- a/gsmmodem/models/SentSms.py +++ /dev/null @@ -1,27 +0,0 @@ -from gsmmodem.models.Sms import Sms -from gsmmodem.models.StatusReport import StatusReport - - -class SentSms(Sms): - """ An SMS message that has been sent (MO) """ - - ENROUTE = 0 # Status indicating message is still enroute to destination - DELIVERED = 1 # Status indicating message has been received by destination handset - FAILED = 2 # Status indicating message delivery has failed - - def __init__(self, number, text, reference, smsc=None): - super(SentSms, self).__init__(number, text, smsc) - self.report = None # Status report for this SMS (StatusReport object) - self.reference = reference - - @property - def status(self): - """ Status of this SMS. Can be ENROUTE, DELIVERED or FAILED - - The actual status report object may be accessed via the 'report' attribute - if status is 'DELIVERED' or 'FAILED' - """ - if self.report == None: - return SentSms.ENROUTE - else: - return SentSms.DELIVERED if self.report.deliveryStatus == StatusReport.DELIVERED else SentSms.FAILED diff --git a/gsmmodem/models/Sms.py b/gsmmodem/models/Sms.py deleted file mode 100644 index fc7f711..0000000 --- a/gsmmodem/models/Sms.py +++ /dev/null @@ -1,24 +0,0 @@ -import abc - - -class Sms(object): - """ Abstract SMS message base class """ - __metaclass__ = abc.ABCMeta - - # Some constants to ease handling SMS statuses - STATUS_RECEIVED_UNREAD = 0 - STATUS_RECEIVED_READ = 1 - STATUS_STORED_UNSENT = 2 - STATUS_STORED_SENT = 3 - STATUS_ALL = 4 - # ...and a handy converter for text mode statuses - TEXT_MODE_STATUS_MAP = {'REC UNREAD': STATUS_RECEIVED_UNREAD, - 'REC READ': STATUS_RECEIVED_READ, - 'STO UNSENT': STATUS_STORED_UNSENT, - 'STO SENT': STATUS_STORED_SENT, - 'ALL': STATUS_ALL} - - def __init__(self, number, text, smsc=None): - self.number = number - self.text = text - self.smsc = smsc \ No newline at end of file diff --git a/gsmmodem/models/StatusReport.py b/gsmmodem/models/StatusReport.py deleted file mode 100644 index b58a2d6..0000000 --- a/gsmmodem/models/StatusReport.py +++ /dev/null @@ -1,24 +0,0 @@ -import weakref - -from gsmmodem.models.Sms import Sms - - -class StatusReport(Sms): - """ An SMS status/delivery report - - Note: the 'status' attribute of this class refers to this status report SM's status (whether - it has been read, etc). To find the status of the message that caused this status report, - use the 'deliveryStatus' attribute. - """ - - DELIVERED = 0 # SMS delivery status: delivery successful - FAILED = 68 # SMS delivery status: delivery failed - - def __init__(self, gsmModem, status, reference, number, timeSent, timeFinalized, deliveryStatus, smsc=None): - super(StatusReport, self).__init__(number, None, smsc) - self._gsmModem = weakref.proxy(gsmModem) - self.status = status - self.reference = reference - self.timeSent = timeSent - self.timeFinalized = timeFinalized - self.deliveryStatus = deliveryStatus diff --git a/gsmmodem/models/__init__.py b/gsmmodem/models/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/gsmmodem/modem.py b/gsmmodem/modem.py deleted file mode 100644 index efacd14..0000000 --- a/gsmmodem/modem.py +++ /dev/null @@ -1,1548 +0,0 @@ -#!/usr/bin/env python - -""" High-level API classes for an attached GSM modem """ - -import logging -import re -import sys -import threading -import time -import weakref - -from gsmmodem.exceptions import EncodingError -from gsmmodem.util import lineMatching -from .exceptions import CommandError, InvalidStateException, CmeError, CmsError, TimeoutException, PinRequiredError, \ - SmscNumberUnknownError -from .models.Call import Call -from .models.IncomingCall import IncomingCall -from .models.ReceivedSms import ReceivedSms -from .models.SentSms import SentSms -from .models.Sms import Sms -from .models.StatusReport import StatusReport -from .pdu import encodeSmsSubmitPdu, decodeSmsPdu, encodeGsm7, encodeTextMode -from .serial_comms import SerialComms -from .util import lineStartingWith, parseTextModeTimeStr, removeAtPrefix - -PYTHON_VERSION = sys.version_info[0] - -CTRLZ = '\x1a' -TERMINATOR = '\r' - -if PYTHON_VERSION >= 3: - xrange = range - dictValuesIter = dict.values - dictItemsIter = dict.items - -class GsmModem(SerialComms): - """ Main class for interacting with an attached GSM modem """ - - log = logging.getLogger('gsmmodem.modem.GsmModem') - - # Used for parsing AT command errors - CM_ERROR_REGEX = re.compile('^\+(CM[ES]) ERROR: (\d+)$') - # Used for parsing signal strength query responses - CSQ_REGEX = re.compile('^\+CSQ:\s*(\d+),') - # Used for parsing caller ID announcements for incoming calls. Group 1 is the number - CLIP_REGEX = re.compile('^\+CLIP:\s*"\+{0,1}(\d+)",(\d+).*$') - # Used for parsing own number. Group 1 is the number - CNUM_REGEX = re.compile('^\+CNUM:\s*".*?","(\+{0,1}\d+)",(\d+).*$') - # Used for parsing new SMS message indications - CMTI_REGEX = re.compile('^\+CMTI:\s*"([^"]+)",\s*(\d+)$') - # Used for parsing SMS message reads (text mode) - CMGR_SM_DELIVER_REGEX_TEXT = None - # Used for parsing SMS status report message reads (text mode) - CMGR_SM_REPORT_REGEXT_TEXT = None - # Used for parsing SMS message reads (PDU mode) - CMGR_REGEX_PDU = None - # Used for parsing USSD event notifications - CUSD_REGEX = re.compile('\+CUSD:\s*(\d),\s*"(.*?)",\s*(\d+)', re.DOTALL) - # Used for parsing SMS status reports - CDSI_REGEX = re.compile('\+CDSI:\s*"([^"]+)",(\d+)$') - CDS_REGEX = re.compile('\+CDS:\s*([0-9]+)"$') - - def __init__(self, port, baudrate=115200, incomingCallCallbackFunc=None, smsReceivedCallbackFunc=None, smsStatusReportCallback=None, requestDelivery=True, AT_CNMI="", *a, **kw): - super(GsmModem, self).__init__(port, baudrate, notifyCallbackFunc=self._handleModemNotification, *a, **kw) - self.incomingCallCallback = incomingCallCallbackFunc or self._placeholderCallback - self.smsReceivedCallback = smsReceivedCallbackFunc or self._placeholderCallback - self.smsStatusReportCallback = smsStatusReportCallback or self._placeholderCallback - self.requestDelivery = requestDelivery - self.AT_CNMI = AT_CNMI or "2,1,0,2" - # Flag indicating whether caller ID for incoming call notification has been set up - self._callingLineIdentification = False - # Flag indicating whether incoming call notifications have extended information - self._extendedIncomingCallIndication = False - # Current active calls (ringing and/or answered), key is the unique call ID (not the remote number) - self.activeCalls = {} - # Dict containing sent SMS messages (for auto-tracking their delivery status) - self.sentSms = weakref.WeakValueDictionary() - self._ussdSessionEvent = None # threading.Event - self._ussdResponse = None # gsmmodem.modem.Ussd - self._smsStatusReportEvent = None # threading.Event - self._dialEvent = None # threading.Event - self._dialResponse = None # gsmmodem.modem.Call - self._waitForAtdResponse = True # Flag that controls if we should wait for an immediate response to ATD, or not - self._waitForCallInitUpdate = True # Flag that controls if we should wait for a ATD "call initiated" message - self._callStatusUpdates = [] # populated during connect() - contains regexes and handlers for detecting/handling call status updates - self._mustPollCallStatus = False # whether or not the modem must be polled for outgoing call status updates - self._pollCallStatusRegex = None # Regular expression used when polling outgoing call status - self._writeWait = 0 # Time (in seconds to wait after writing a command (adjusted when 515 errors are detected) - self._smsTextMode = False # Storage variable for the smsTextMode property - self._gsmBusy = 0 # Storage variable for the GSMBUSY property - self._smscNumber = None # Default SMSC number - self._smsRef = 0 # Sent SMS reference counter - self._smsMemReadDelete = None # Preferred message storage memory for reads/deletes ( parameter used for +CPMS) - self._smsMemWrite = None # Preferred message storage memory for writes ( parameter used for +CPMS) - self._smsReadSupported = True # Whether or not reading SMS messages is supported via AT commands - self._smsEncoding = 'GSM' # Default SMS encoding - self._smsSupportedEncodingNames = None # List of available encoding names - self._commands = None # List of supported AT commands - #Pool of detected DTMF - self.dtmfpool = [] - - def connect(self, pin=None, waitingForModemToStartInSeconds=0): - """ Opens the port and initializes the modem and SIM card - - :param pin: The SIM card PIN code, if any - :type pin: str - - :raise PinRequiredError: if the SIM card requires a PIN but none was provided - :raise IncorrectPinError: if the specified PIN is incorrect - """ - self.log.info('Connecting to modem on port %s at %dbps', self.port, self.baudrate) - super(GsmModem, self).connect() - - if waitingForModemToStartInSeconds > 0: - while waitingForModemToStartInSeconds > 0: - try: - self.write('AT', waitForResponse=True, timeout=0.5) - break - except TimeoutException: - waitingForModemToStartInSeconds -= 0.5 - - # Send some initialization commands to the modem - try: - self.write('ATZ') # reset configuration - except CommandError: - # Some modems require a SIM PIN at this stage already; unlock it now - # Attempt to enable detailed error messages (to catch incorrect PIN error) - # but ignore if it fails - self.write('AT+CMEE=1', parseError=False) - self._unlockSim(pin) - pinCheckComplete = True - self.write('ATZ') # reset configuration - else: - pinCheckComplete = False - self.write('ATE0') # echo off - try: - cfun = lineStartingWith('+CFUN:', self.write('AT+CFUN?'))[7:] # example response: +CFUN: 1 or +CFUN: 1,0 - cfun = int(cfun.split(",")[0]) - if cfun != 1: - self.write('AT+CFUN=1') - except CommandError: - pass # just ignore if the +CFUN command isn't supported - - self.write('AT+CMEE=1') # enable detailed error messages (even if it has already been set - ATZ may reset this) - if not pinCheckComplete: - self._unlockSim(pin) - - # Get list of supported commands from modem - commands = self.supportedCommands - self._commands = commands - - # Device-specific settings - callUpdateTableHint = 0 # unknown modem - enableWind = False - if commands != None: - if '^CVOICE' in commands: - self.write('AT^CVOICE=0', parseError=False) # Enable voice calls - if '+VTS' in commands: # Check for DTMF sending support - Call.dtmfSupport = True - elif '^DTMF' in commands: - # Huawei modems use ^DTMF to send DTMF tones - callUpdateTableHint = 1 # Huawei - if '^USSDMODE' in commands: - # Enable Huawei text-mode USSD - self.write('AT^USSDMODE=0', parseError=False) - if '+WIND' in commands: - callUpdateTableHint = 2 # Wavecom - enableWind = True - elif '+ZPAS' in commands: - callUpdateTableHint = 3 # ZTE - else: - # Try to enable general notifications on Wavecom-like device - enableWind = True - - if enableWind: - try: - wind = lineStartingWith('+WIND:', self.write('AT+WIND?')) # Check current WIND value; example response: +WIND: 63 - except CommandError: - # Modem does not support +WIND notifications. See if we can detect other known call update notifications - pass - else: - # Enable notifications for call setup, hangup, etc - # if int(wind[7:]) != 50: - # self.write('AT+WIND=50') - callUpdateTableHint = 2 # Wavecom - - # Attempt to identify modem type directly (if not already) - for outgoing call status updates - if callUpdateTableHint == 0: - if 'simcom' in self.manufacturer.lower() : #simcom modems support DTMF and don't support AT+CLAC - Call.dtmfSupport = True - try: - self.write('AT+DDET=1') # enable detect incoming DTMF - except CommandError: - # simcom 7000E for example doesn't support the DDET command - Call.dtmfSupport = False - - if self.manufacturer.lower() == 'huawei': - callUpdateTableHint = 1 # huawei - else: - # See if this is a ZTE modem that has not yet been identified based on supported commands - try: - self.write('AT+ZPAS?') - except CommandError: - pass # Not a ZTE modem - else: - callUpdateTableHint = 3 # ZTE - # Load outgoing call status updates based on identified modem features - if callUpdateTableHint == 1: - # Use Hauwei's ^NOTIFICATIONs - self.log.info('Loading Huawei call state update table') - self._callStatusUpdates = ((re.compile('^\^ORIG:(\d),(\d)$'), self._handleCallInitiated), - (re.compile('^\^CONN:(\d),(\d)$'), self._handleCallAnswered), - (re.compile('^\^CEND:(\d),(\d+),(\d)+,(\d)+$'), self._handleCallEnded)) - self._mustPollCallStatus = False - # Huawei modems use ^DTMF to send DTMF tones; use that instead - Call.DTMF_COMMAND_BASE = '^DTMF={cid},' - Call.dtmfSupport = True - elif callUpdateTableHint == 2: - # Wavecom modem: +WIND notifications supported - self.log.info('Loading Wavecom call state update table') - self._callStatusUpdates = ((re.compile('^\+WIND: 5,(\d)$'), self._handleCallInitiated), - (re.compile('^OK$'), self._handleCallAnswered), - (re.compile('^\+WIND: 6,(\d)$'), self._handleCallEnded)) - self._waitForAtdResponse = False # Wavecom modems return OK only when the call is answered - self._mustPollCallStatus = False - if commands == None: # older modem, assume it has standard DTMF support - Call.dtmfSupport = True - elif callUpdateTableHint == 3: # ZTE - # Use ZTE notifications ("CONNECT"/"HANGUP", but no "call initiated" notification) - self.log.info('Loading ZTE call state update table') - self._callStatusUpdates = ((re.compile('^CONNECT$'), self._handleCallAnswered), - (re.compile('^HANGUP:\s*(\d+)$'), self._handleCallEnded), - (re.compile('^OK$'), self._handleCallRejected)) - self._waitForAtdResponse = False # ZTE modems do not return an immediate OK only when the call is answered - self._mustPollCallStatus = False - self._waitForCallInitUpdate = False # ZTE modems do not provide "call initiated" updates - if commands == None: # ZTE uses standard +VTS for DTMF - Call.dtmfSupport = True - else: - # Unknown modem - we do not know what its call updates look like. Use polling instead - self.log.info('Unknown/generic modem type - will use polling for call state updates') - self._mustPollCallStatus = True - self._pollCallStatusRegex = re.compile('^\+CLCC:\s+(\d+),(\d),(\d),(\d),([^,]),"([^,]*)",(\d+)$') - self._waitForAtdResponse = True # Most modems return OK immediately after issuing ATD - - # General meta-information setup - self.write('AT+COPS=3,0', parseError=False) # Use long alphanumeric name format - - # SMS setup - self.write('AT+CMGF={0}'.format(1 if self.smsTextMode else 0)) # Switch to text or PDU mode for SMS messages - self._compileSmsRegexes() - if self._smscNumber != None: - self.write('AT+CSCA="{0}"'.format(self._smscNumber)) # Set default SMSC number - currentSmscNumber = self._smscNumber - else: - currentSmscNumber = self.smsc - # Some modems delete the SMSC number when setting text-mode SMS parameters; preserve it if needed - if currentSmscNumber != None: - self._smscNumber = None # clear cache - if self.requestDelivery: - self.write('AT+CSMP=49,167,0,0', parseError=False) # Enable delivery reports - else: - self.write('AT+CSMP=17,167,0,0', parseError=False) # Not enable delivery reports - # ...check SMSC again to ensure it did not change - if currentSmscNumber != None and self.smsc != currentSmscNumber: - self.smsc = currentSmscNumber - - # Set message storage, but first check what the modem supports - example response: +CPMS: (("SM","BM","SR"),("SM")) - try: - cpmsLine = lineStartingWith('+CPMS', self.write('AT+CPMS=?')) - except CommandError: - # Modem does not support AT+CPMS; SMS reading unavailable - self._smsReadSupported = False - self.log.warning('SMS preferred message storage query not supported by modem. SMS reading unavailable.') - else: - cpmsSupport = cpmsLine.split(' ', 1)[1].split('),(') - # Do a sanity check on the memory types returned - Nokia S60 devices return empty strings, for example - for memItem in cpmsSupport: - if len(memItem) == 0: - # No support for reading stored SMS via AT commands - probably a Nokia S60 - self._smsReadSupported = False - self.log.warning('Invalid SMS message storage support returned by modem. SMS reading unavailable. Response was: "%s"', cpmsLine) - break - else: - # Suppported memory types look fine, continue - preferredMemoryTypes = ('"ME"', '"SM"', '"SR"') - cpmsItems = [''] * len(cpmsSupport) - for i in xrange(len(cpmsSupport)): - for memType in preferredMemoryTypes: - if memType in cpmsSupport[i]: - if i == 0: - self._smsMemReadDelete = memType - cpmsItems[i] = memType - break - self.write('AT+CPMS={0}'.format(','.join(cpmsItems))) # Set message storage - del cpmsSupport - del cpmsLine - - if self._smsReadSupported and (self.smsReceivedCallback or self.smsStatusReportCallback): - try: - self.write('AT+CNMI=' + self.AT_CNMI) # Set message notifications - except CommandError: - try: - self.write('AT+CNMI=2,1,0,1,0') # Set message notifications, using TE for delivery reports - except CommandError: - # Message notifications not supported - self._smsReadSupported = False - self.log.warning('Incoming SMS notifications not supported by modem. SMS receiving unavailable.') - - # Incoming call notification setup - try: - self.write('AT+CLIP=1') # Enable calling line identification presentation - except CommandError as clipError: - self._callingLineIdentification = False - self.log.warning('Incoming call calling line identification (caller ID) not supported by modem. Error: {0}'.format(clipError)) - else: - self._callingLineIdentification = True - try: - self.write('AT+CRC=1') # Enable extended format of incoming indication (optional) - except CommandError as crcError: - self._extendedIncomingCallIndication = False - self.log.warning('Extended format incoming call indication not supported by modem. Error: {0}'.format(crcError)) - else: - self._extendedIncomingCallIndication = True - - # Call control setup - self.write('AT+CVHU=0', parseError=False) # Enable call hang-up with ATH command (ignore if command not supported) - - def _unlockSim(self, pin): - """ Unlocks the SIM card using the specified PIN (if necessary, else does nothing) """ - # Unlock the SIM card if needed - try: - cpinResponse = lineStartingWith('+CPIN', self.write('AT+CPIN?', timeout=15)) - except TimeoutException as timeout: - # Wavecom modems do not end +CPIN responses with "OK" (github issue #19) - see if just the +CPIN response was returned - if timeout.data != None: - cpinResponse = lineStartingWith('+CPIN', timeout.data) - if cpinResponse == None: - # No useful response read - raise timeout - else: - # Nothing read (real timeout) - raise timeout - if cpinResponse != '+CPIN: READY': - if pin != None: - self.write('AT+CPIN="{0}"'.format(pin)) - else: - raise PinRequiredError('AT+CPIN') - - def write(self, data, waitForResponse=True, timeout:float =10, parseError=True, writeTerm=TERMINATOR, expectedResponseTermSeq=None): - """ Write data to the modem. - - This method adds the ``\\r\\n`` end-of-line sequence to the data parameter, and - writes it to the modem. - - :param data: Command/data to be written to the modem - :type data: str - :param waitForResponse: Whether this method should block and return the response from the modem or not - :type waitForResponse: bool - :param timeout: Maximum amount of time in seconds to wait for a response from the modem - :type timeout: int - :param parseError: If True, a CommandError is raised if the modem responds with an error (otherwise the response is returned as-is) - :type parseError: bool - :param writeTerm: The terminating sequence to append to the written data - :type writeTerm: str - :param expectedResponseTermSeq: The expected terminating sequence that marks the end of the modem's response (defaults to ``\\r\\n``) - :type expectedResponseTermSeq: str - - :raise CommandError: if the command returns an error (only if parseError parameter is True) - :raise TimeoutException: if no response to the command was received from the modem - - :return: A list containing the response lines from the modem, or None if waitForResponse is False - :rtype: list - """ - - self.log.debug('write: %s', data) - responseLines = super(GsmModem, self).write(data + writeTerm, waitForResponse=waitForResponse, timeout=timeout, expectedResponseTermSeq=expectedResponseTermSeq) - if self._writeWait > 0: # Sleep a bit if required (some older modems suffer under load) - time.sleep(self._writeWait) - if waitForResponse: - cmdStatusLine = responseLines[-1] - if parseError: - if 'ERROR' in cmdStatusLine: - cmErrorMatch = self.CM_ERROR_REGEX.match(cmdStatusLine) - if cmErrorMatch: - errorType = cmErrorMatch.group(1) - errorCode = int(cmErrorMatch.group(2)) - if errorCode == 515 or errorCode == 14: - # 515 means: "Please wait, init or command processing in progress." - # 14 means "SIM busy" - self._writeWait += 0.2 # Increase waiting period temporarily - # Retry the command after waiting a bit - self.log.debug('Device/SIM busy error detected; self._writeWait adjusted to %fs', self._writeWait) - time.sleep(self._writeWait) - result = self.write(data, waitForResponse, timeout, parseError, writeTerm, expectedResponseTermSeq) - self.log.debug('self_writeWait set to 0.1 because of recovering from device busy (515) error') - if errorCode == 515: - self._writeWait = 0.1 # Set this to something sane for further commands (slow modem) - else: - self._writeWait = 0 # The modem was just waiting for the SIM card - return result - if errorType == 'CME': - raise CmeError(data, int(errorCode)) - else: # CMS error - raise CmsError(data, int(errorCode)) - else: - raise CommandError(data) - elif cmdStatusLine == 'COMMAND NOT SUPPORT': # Some Huawei modems respond with this for unknown commands - raise CommandError('{} ({})'.format(data,cmdStatusLine)) - return responseLines - - @property - def signalStrength(self): - """ Checks the modem's cellular network signal strength - - :raise CommandError: if an error occurs - - :return: The network signal strength as an integer between 0 and 99, or -1 if it is unknown - :rtype: int - """ - csq = self.CSQ_REGEX.match(self.write('AT+CSQ')[0]) - if csq: - ss = int(csq.group(1)) - return ss if ss != 99 else -1 - else: - raise CommandError() - - @property - def manufacturer(self): - """ :return: The modem's manufacturer's name """ - return self.write('AT+CGMI')[0] - - @property - def model(self): - """ :return: The modem's model name """ - return self.write('AT+CGMM')[0] - - @property - def revision(self): - """ :return: The modem's software revision, or None if not known/supported """ - try: - return self.write('AT+CGMR')[0] - except CommandError: - return None - - @property - def imei(self): - """ :return: The modem's serial number (IMEI number) """ - return self.write('AT+CGSN')[0] - - @property - def imsi(self): - """ :return: The IMSI (International Mobile Subscriber Identity) of the SIM card. The PIN may need to be entered before reading the IMSI """ - return self.write('AT+CIMI')[0] - - @property - def networkName(self): - """ :return: the name of the GSM Network Operator to which the modem is connected """ - copsMatch = lineMatching('^\+COPS: (\d),(\d),"(.+)",{0,1}\d*$', self.write('AT+COPS?')) # response format: +COPS: mode,format,"operator_name",x - if copsMatch: - return copsMatch.group(3) - - @property - def supportedCommands(self): - """ :return: list of AT commands supported by this modem (without the AT prefix). Returns None if not known """ - try: - # AT+CLAC responses differ between modems. Most respond with +CLAC: and then a comma-separated list of commands - # while others simply return each command on a new line, with no +CLAC: prefix - response = self.write('AT+CLAC', timeout=10) - if len(response) == 2: # Single-line response, comma separated - commands = response[0] - if commands.startswith('+CLAC'): - commands = commands[6:] # remove the +CLAC: prefix before splitting - return commands.split(',') - elif len(response) > 2: # Multi-line response - return [removeAtPrefix(cmd.strip()) for cmd in response[:-1]] - else: - self.log.debug('Unhandled +CLAC response: {0}'.format(response)) - return None - except (TimeoutException, CommandError): - # Try interactive command recognition - commands = [] - checkable_commands = ['^CVOICE', '+VTS', '^DTMF', '^USSDMODE', '+WIND', '+ZPAS', '+CSCS', '+CNUM'] - - # Check if modem is still alive - try: - response = self.write('AT') - except: - raise TimeoutException - - # Check all commands that will by considered - for command in checkable_commands: - try: - # Compose AT command that will read values under specified function - at_command='AT'+command+'=?' - response = self.write(at_command) - # If there are values inside response - add command to the list - commands.append(command) - except: - continue - - # Return found commands - if len(commands) == 0: - return None - else: - return commands - - @property - def smsTextMode(self): - """ :return: True if the modem is set to use text mode for SMS, False if it is set to use PDU mode """ - return self._smsTextMode - @smsTextMode.setter - def smsTextMode(self, textMode): - """ Set to True for the modem to use text mode for SMS, or False for it to use PDU mode """ - if textMode != self._smsTextMode: - if self.alive: - self.write('AT+CMGF={0}'.format(1 if textMode else 0)) - self._smsTextMode = textMode - self._compileSmsRegexes() - - @property - def smsSupportedEncoding(self): - """ - :raise NotImplementedError: If an error occures during AT command response parsing. - :return: List of supported encoding names. """ - - # Check if command is available - if self._commands == None: - self._commands = self.supportedCommands - - if self._commands == None: - self._smsSupportedEncodingNames = [] - return self._smsSupportedEncodingNames - - if not '+CSCS' in self._commands: - self._smsSupportedEncodingNames = [] - return self._smsSupportedEncodingNames - - # Get available encoding names - response = self.write('AT+CSCS=?') - - # Check response length (should be 2 - list of options and command status) - if len(response) != 2: - self.log.debug('Unhandled +CSCS response: {0}'.format(response)) - self._smsSupportedEncodingNames = [] - raise NotImplementedError - - # Extract encoding names list - try: - enc_list = response[0] # Get the first line - enc_list = enc_list[6:] # Remove '+CSCS: ' prefix - # Extract AT list in format ("str", "str2", "str3") - enc_list = enc_list.split('(')[1] - enc_list = enc_list.split(')')[0] - enc_list = enc_list.split(',') - enc_list = [x.split('"')[1] for x in enc_list] - except: - self.log.debug('Unhandled +CSCS response: {0}'.format(response)) - self._smsSupportedEncodingNames = [] - raise NotImplementedError - - self._smsSupportedEncodingNames = enc_list - return self._smsSupportedEncodingNames - - @property - def smsEncoding(self): - """ :return: Encoding name if encoding command is available, else GSM. """ - if self._commands == None: - self._commands = self.supportedCommands - - if self._commands == None: - return self._smsEncoding - - if '+CSCS' in self._commands: - response = self.write('AT+CSCS?') - - if len(response) == 2: - encoding = response[0] - if encoding.startswith('+CSCS'): - encoding = encoding[6:].split('"') # remove the +CSCS: prefix before splitting - if len(encoding) == 3: - self._smsEncoding = encoding[1] - else: - self.log.debug('Unhandled +CSCS response: {0}'.format(response)) - else: - self.log.debug('Unhandled +CSCS response: {0}'.format(response)) - - return self._smsEncoding - @smsEncoding.setter - def smsEncoding(self, encoding): - """ Set encoding for SMS inside PDU mode. - - :raise CommandError: if unable to set encoding - :raise ValueError: if encoding is not supported by modem - """ - # Check if command is available - if self._commands == None: - self._commands = self.supportedCommands - - if self._commands == None: - if encoding != self._smsEncoding: - raise CommandError('Unable to set SMS encoding (no supported commands)') - else: - return - - if not '+CSCS' in self._commands: - if encoding != self._smsEncoding: - raise CommandError('Unable to set SMS encoding (+CSCS command not supported)') - else: - return - - # Check if command is available - if self._smsSupportedEncodingNames == None: - self.smsSupportedEncoding - - # Check if desired encoding is available - if encoding in self._smsSupportedEncodingNames: - # Set encoding - response = self.write('AT+CSCS="{0}"'.format(encoding)) - if len(response) == 1: - if response[0].lower() == 'ok': - self._smsEncoding = encoding - return - - if encoding != self._smsEncoding: - raise ValueError('Unable to set SMS encoding (enocoding {0} not supported)'.format(encoding)) - else: - return - - def _setSmsMemory(self, readDelete=None, write=None): - """ Set the current SMS memory to use for read/delete/write operations """ - # Switch to the correct memory type if required - if write != None and write != self._smsMemWrite: - readDel = readDelete or self._smsMemReadDelete - self.write('AT+CPMS="{0}","{1}"'.format(readDel, write)) - self._smsMemReadDelete = readDel - self._smsMemWrite = write - elif readDelete != None and readDelete != self._smsMemReadDelete: - self.write('AT+CPMS="{0}"'.format(readDelete)) - self._smsMemReadDelete = readDelete - - def _compileSmsRegexes(self): - """ Compiles regular expression used for parsing SMS messages based on current mode """ - if self.smsTextMode: - if self.CMGR_SM_DELIVER_REGEX_TEXT == None: - self.CMGR_SM_DELIVER_REGEX_TEXT = re.compile('^\+CMGR: "([^"]+)","([^"]+)",[^,]*,"([^"]+)"$') - self.CMGR_SM_REPORT_REGEXT_TEXT = re.compile('^\+CMGR: ([^,]*),\d+,(\d+),"{0,1}([^"]*)"{0,1},\d*,"([^"]+)","([^"]+)",(\d+)$') - elif self.CMGR_REGEX_PDU == None: - self.CMGR_REGEX_PDU = re.compile('^\+CMGR:\s*(\d*),\s*"{0,1}([^"]*)"{0,1},\s*(\d+)$') - - @property - def gsmBusy(self): - """ :return: Current GSMBUSY state """ - try: - response = self.write('AT+GSMBUSY?') - response = response[0] # Get the first line - response = response[10] # Remove '+GSMBUSY: ' prefix - self._gsmBusy = response - except: - pass # If error is related to ME funtionality: +CME ERROR: - return self._gsmBusy - @gsmBusy.setter - def gsmBusy(self, gsmBusy): - """ Sete GSMBUSY state """ - if gsmBusy != self._gsmBusy: - if self.alive: - self.write('AT+GSMBUSY="{0}"'.format(gsmBusy)) - self._gsmBusy = gsmBusy - - @property - def smsc(self): - """ :return: The default SMSC number stored on the SIM card """ - if self._smscNumber == None: - try: - readSmsc = self.write('AT+CSCA?') - except SmscNumberUnknownError: - pass # Some modems return a CMS 330 error if the value isn't set - else: - cscaMatch = lineMatching('\+CSCA:\s*"([^,]+)",(\d+)$', readSmsc) - if cscaMatch: - self._smscNumber = cscaMatch.group(1) - return self._smscNumber - @smsc.setter - def smsc(self, smscNumber): - """ Set the default SMSC number to use when sending SMS messages """ - if smscNumber != self._smscNumber: - if self.alive: - self.write('AT+CSCA="{0}"'.format(smscNumber)) - self._smscNumber = smscNumber - - @property - def ownNumber(self): - """ Query subscriber phone number. - - It must be stored on SIM by operator. - If is it not stored already, it usually is possible to store the number by user. - - :raise TimeoutException: if a timeout was specified and reached - - - :return: Subscriber SIM phone number. Returns None if not known - :rtype: int - """ - - try: - if "+CNUM" in self._commands: - response = self.write('AT+CNUM') - else: - response = self.retrevie_first_number() - - if len(response)==1 and response[0] == "OK": # command is supported, but no number is set - response = self.retrevie_first_number() - return None - elif len(response) == 2: # OK and phone number. Actual number is in the first line, second parameter, and is placed inside quotation marks - cnumLine = response[0] - cnumMatch = self.CNUM_REGEX.match(cnumLine) - if cnumMatch: - return cnumMatch.group(1) - else: - self.log.debug('Error parse +CNUM response: {0}'.format(response)) - return None - elif len(response) > 2: # Multi-line response - self.log.debug('Unhandled +CNUM/+CPBS response: {0}'.format(response)) - return None - - except (TimeoutException, CommandError): - raise - - def retrevie_first_number(self): - # temporarily switch to "own numbers" phonebook, read position 1 and than switch back - response = self.write('AT+CPBS?') - selected_phonebook = response[0][6:].split('"')[ - 1] # first line, remove the +CSCS: prefix, split, first parameter - if selected_phonebook is not "ON": - self.write('AT+CPBS="ON"') - response = self.write("AT+CPBR=1") - self.write('AT+CPBS="{0}"'.format(selected_phonebook)) - return response - - @ownNumber.setter - def ownNumber(self, phone_number): - actual_phonebook = self.write('AT+CPBS?') - if actual_phonebook is not "ON": - self.write('AT+CPBS="ON"') - self.write('AT+CPBW=1,"' + phone_number + '"') - - - def waitForNetworkCoverage(self, timeout=None): - """ Block until the modem has GSM network coverage. - - This method blocks until the modem is registered with the network - and the signal strength is greater than 0, optionally timing out - if a timeout was specified - - :param timeout: Maximum time to wait for network coverage, in seconds - :type timeout: int or float - - :raise TimeoutException: if a timeout was specified and reached - :raise InvalidStateException: if the modem is not going to receive network coverage (SIM blocked, etc) - - :return: the current signal strength - """ - block = [True] - if timeout != None: - # Set up a timeout mechanism - def _cancelBlock(): - block[0] = False - t = threading.Timer(timeout, _cancelBlock) - t.start() - ss = -1 - checkCreg = True - while block[0]: - if checkCreg: - cregResult = lineMatching('^\+CREG:\s*(\d),(\d)(,[^,]*,[^,]*)?$', self.write('AT+CREG?', parseError=False)) # example result: +CREG: 0,1 - if cregResult: - status = int(cregResult.group(2)) - if status in (1, 5): - # 1: registered, home network, 5: registered, roaming - # Now simply check and return network signal strength - checkCreg = False - elif status == 3: - raise InvalidStateException('Network registration denied') - elif status == 0: - raise InvalidStateException('Device not searching for network operator') - else: - # Disable network registration check; only use signal strength - self.log.info('+CREG check disabled due to invalid response or unsupported command') - checkCreg = False - else: - # Check signal strength - ss = self.signalStrength - if ss > 0: - return ss - time.sleep(1) - else: - # If this is reached, the timer task has triggered - raise TimeoutException() - - def sendSms(self, destination, text, waitForDeliveryReport=False, deliveryTimeout=15, sendFlash=False): - """ Send an SMS text message - - :param destination: the recipient's phone number - :type destination: str - :param text: the message text - :type text: str - :param waitForDeliveryReport: if True, this method blocks until a delivery report is received for the sent message - :type waitForDeliveryReport: boolean - :param deliveryTimeout: the maximum time in seconds to wait for a delivery report (if "waitForDeliveryReport" is True) - :type deliveryTimeout: int or float - - :raise CommandError: if an error occurs while attempting to send the message - :raise TimeoutException: if the operation times out - """ - - # Check input text to select appropriate mode (text or PDU) - if self.smsTextMode: - try: - encodedText = encodeTextMode(text) - except ValueError: - self.smsTextMode = False - - if self.smsTextMode: - # Send SMS via AT commands - self.write('AT+CMGS="{0}"'.format(destination), timeout=5, expectedResponseTermSeq='> ') - result = lineStartingWith('+CMGS:', self.write(text, timeout=35, writeTerm=CTRLZ)) - else: - # Check encoding - try: - encodedText = encodeGsm7(text) - except ValueError: - encodedText = None - - # Set GSM modem SMS encoding format - # Encode message text and set data coding scheme based on text contents - if encodedText == None: - # Cannot encode text using GSM-7; use UCS2 instead - self.smsEncoding = 'UCS2' - else: - self.smsEncoding = 'GSM' - - # Encode text into PDUs - pdus = encodeSmsSubmitPdu(destination, text, reference=self._smsRef, sendFlash=sendFlash) - - # Send SMS PDUs via AT commands - for pdu in pdus: - self.write('AT+CMGS={0}'.format(pdu.tpduLength), timeout=5, expectedResponseTermSeq='> ') - result = lineStartingWith('+CMGS:', self.write(str(pdu), timeout=35, writeTerm=CTRLZ)) # example: +CMGS: xx - - if result == None: - raise CommandError('Modem did not respond with +CMGS response') - - # Keep SMS reference number in order to pair delivery reports with sent message - reference = int(result[7:]) - self._smsRef = reference + 1 - if self._smsRef > 255: - self._smsRef = 0 - - # Create sent SMS object for future delivery checks - sms = SentSms(destination, text, reference) - - # Add a weak-referenced entry for this SMS (allows us to update the SMS state if a status report is received) - self.sentSms[reference] = sms - if waitForDeliveryReport: - self._smsStatusReportEvent = threading.Event() - if self._smsStatusReportEvent.wait(deliveryTimeout): - self._smsStatusReportEvent = None - else: # Response timed out - self._smsStatusReportEvent = None - raise TimeoutException() - return sms - - def sendUssd(self, ussdString, responseTimeout=60): - """ Starts a USSD session by dialing the the specified USSD string, or \ - sends the specified string in the existing USSD session (if any) - - :param ussdString: The USSD access number to dial - :param responseTimeout: Maximum time to wait a response, in seconds - - :raise TimeoutException: if no response is received in time - - :return: The USSD response message/session (as a Ussd object) - :rtype: gsmmodem.modem.Ussd - """ - self._ussdSessionEvent = threading.Event() - try: - cusdResponse = self.write('AT+CUSD=1,"{0}",15'.format(ussdString), timeout=responseTimeout) # Should respond with "OK" - except Exception: - self._ussdSessionEvent = None # Cancel the thread sync lock - raise - - # Some modems issue the +CUSD response before the acknowledgment "OK" - check for that - if len(cusdResponse) > 1: - cusdResponseFound = lineStartingWith('+CUSD', cusdResponse) != None - if cusdResponseFound: - self._ussdSessionEvent = None # Cancel thread sync lock - return self._parseCusdResponse(cusdResponse) - # Wait for the +CUSD notification message - if self._ussdSessionEvent.wait(responseTimeout): - self._ussdSessionEvent = None - return self._ussdResponse - else: # Response timed out - self._ussdSessionEvent = None - raise TimeoutException() - - - def checkForwarding(self, querytype, responseTimeout=15): - """ Check forwarding status: 0=Unconditional, 1=Busy, 2=NoReply, 3=NotReach, 4=AllFwd, 5=AllCondFwd - :param querytype: The type of forwarding to check - - :return: Status - :rtype: Boolean - """ - try: - queryResponse = self.write('AT+CCFC={0},2'.format(querytype), timeout=responseTimeout) # Should respond with "OK" - except Exception: - raise - print(queryResponse) - return True - - - def setForwarding(self, fwdType, fwdEnable, fwdNumber, responseTimeout=15): - """ Check forwarding status: 0=Unconditional, 1=Busy, 2=NoReply, 3=NotReach, 4=AllFwd, 5=AllCondFwd - :param fwdType: The type of forwarding to set - :param fwdEnable: 1 to enable, 0 to disable, 2 to query, 3 to register, 4 to erase - :param fwdNumber: Number to forward to - - :return: Success or not - :rtype: Boolean - """ - try: - queryResponse = self.write('AT+CCFC={0},{1},"{2}"'.format(fwdType, fwdEnable, fwdNumber), timeout=responseTimeout) # Should respond with "OK" - except Exception: - raise - return False - print(queryResponse) - return queryResponse - - def dial(self, number, timeout=5, callStatusUpdateCallbackFunc=None): - """ Calls the specified phone number using a voice phone call - - :param number: The phone number to dial - :param timeout: Maximum time to wait for the call to be established - :param callStatusUpdateCallbackFunc: Callback function that is executed if the call's status changes due to - remote events (i.e. when it is answered, the call is ended by the remote party) - - :return: The outgoing call - :rtype: gsmmodem.modem.Call - """ - if self._waitForCallInitUpdate: - # Wait for the "call originated" notification message - self._dialEvent = threading.Event() - try: - self.write('ATD{0};'.format(number), timeout=timeout, waitForResponse=self._waitForAtdResponse) - except Exception: - self._dialEvent = None # Cancel the thread sync lock - raise - else: - # Don't wait for a call init update - base the call ID on the number of active calls - self.write('ATD{0};'.format(number), timeout=timeout, waitForResponse=self._waitForAtdResponse) - self.log.debug("Not waiting for outgoing call init update message") - callId = len(self.activeCalls) + 1 - callType = 0 # Assume voice - call = Call(self, callId, callType, number, callStatusUpdateCallbackFunc) - self.activeCalls[callId] = call - return call - - if self._mustPollCallStatus: - # Fake a call notification by polling call status until the status indicates that the call is being dialed - threading.Thread(target=self._pollCallStatus, kwargs={'expectedState': 0, 'timeout': timeout}).start() - - if self._dialEvent.wait(timeout): - self._dialEvent = None - callId, callType = self._dialResponse - call = Call(self, callId, callType, number, callStatusUpdateCallbackFunc) - self.activeCalls[callId] = call - return call - else: # Call establishing timed out - self._dialEvent = None - raise TimeoutException() - - def processStoredSms(self, unreadOnly=False): - """ Process all SMS messages currently stored on the device/SIM card. - - Reads all (or just unread) received SMS messages currently stored on the - device/SIM card, initiates "SMS received" events for them, and removes - them from the SIM card. - This is useful if SMS messages were received during a period that - python-gsmmodem was not running but the modem was powered on. - - :param unreadOnly: If True, only process unread SMS messages - :type unreadOnly: boolean - """ - if self.smsReceivedCallback: - states = [Sms.STATUS_RECEIVED_UNREAD] - if not unreadOnly: - states.insert(0, Sms.STATUS_RECEIVED_READ) - for msgStatus in states: - messages = self.listStoredSms(status=msgStatus, delete=True) - for sms in messages: - self.smsReceivedCallback(sms) - else: - raise ValueError('GsmModem.smsReceivedCallback not set') - - def listStoredSms(self, status=Sms.STATUS_ALL, memory=None, delete=False): - """ Returns SMS messages currently stored on the device/SIM card. - - The messages are read from the memory set by the "memory" parameter. - - :param status: Filter messages based on this read status; must be 0-4 (see Sms class) - :type status: int - :param memory: The memory type to read from. If None, use the current default SMS read memory - :type memory: str or None - :param delete: If True, delete returned messages from the device/SIM card - :type delete: bool - - :return: A list of Sms objects containing the messages read - :rtype: list - """ - self._setSmsMemory(readDelete=memory) - messages = [] - delMessages = set() - if self.smsTextMode: - cmglRegex= re.compile('^\+CMGL: (\d+),"([^"]+)","([^"]+)",[^,]*,"([^"]+)"$') - for key, val in dictItemsIter(Sms.TEXT_MODE_STATUS_MAP): - if status == val: - statusStr = key - break - else: - raise ValueError('Invalid status value: {0}'.format(status)) - result = self.write('AT+CMGL="{0}"'.format(statusStr)) - msgLines = [] - msgIndex = msgStatus = number = msgTime = None - for line in result: - cmglMatch = cmglRegex.match(line) - if cmglMatch: - # New message; save old one if applicable - if msgIndex != None and len(msgLines) > 0: - msgText = '\n'.join(msgLines) - msgLines = [] - messages.append(ReceivedSms(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], number, parseTextModeTimeStr(msgTime), msgText, None, [], msgIndex)) - delMessages.add(int(msgIndex)) - msgIndex, msgStatus, number, msgTime = cmglMatch.groups() - msgLines = [] - else: - if line != 'OK': - msgLines.append(line) - if msgIndex != None and len(msgLines) > 0: - msgText = '\n'.join(msgLines) - msgLines = [] - messages.append(ReceivedSms(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], number, parseTextModeTimeStr(msgTime), msgText, None, [], msgIndex)) - delMessages.add(int(msgIndex)) - else: - cmglRegex = re.compile('^\+CMGL:\s*(\d+),\s*(\d+),.*$') - readPdu = False - result = self.write('AT+CMGL={0}'.format(status)) - for line in result: - if not readPdu: - cmglMatch = cmglRegex.match(line) - if cmglMatch: - msgIndex = int(cmglMatch.group(1)) - msgStat = int(cmglMatch.group(2)) - readPdu = True - else: - try: - smsDict = decodeSmsPdu(line) - except EncodingError: - self.log.debug('Discarding line from +CMGL response: %s', line) - except: - pass - # dirty fix warning: https://github.com/yuriykashin/python-gsmmodem/issues/1 - # todo: make better fix - else: - if smsDict['type'] == 'SMS-DELIVER': - sms = ReceivedSms(self, int(msgStat), smsDict['number'], smsDict['time'], smsDict['text'], smsDict['smsc'], smsDict.get('udh', []), msgIndex) - elif smsDict['type'] == 'SMS-STATUS-REPORT': - sms = StatusReport(self, int(msgStat), smsDict['reference'], smsDict['number'], smsDict['time'], smsDict['discharge'], smsDict['status']) - else: - raise CommandError('Invalid PDU type for readStoredSms(): {0}'.format(smsDict['type'])) - messages.append(sms) - delMessages.add(msgIndex) - readPdu = False - if delete: - if status == Sms.STATUS_ALL: - # Delete all messages - self.deleteMultipleStoredSms() - else: - for msgIndex in delMessages: - self.deleteStoredSms(msgIndex) - return messages - - def _handleModemNotification(self, lines): - """ Handler for unsolicited notifications from the modem - - This method simply spawns a separate thread to handle the actual notification - (in order to release the read thread so that the handlers are able to write back to the modem, etc) - - :param lines The lines that were read - """ - threading.Thread(target=self.__threadedHandleModemNotification, kwargs={'lines': lines}).start() - - def __threadedHandleModemNotification(self, lines): - """ Implementation of _handleModemNotification() to be run in a separate thread - - :param lines The lines that were read - """ - next_line_is_te_statusreport = False - for line in lines: - if 'RING' in line: - # Incoming call (or existing call is ringing) - self._handleIncomingCall(lines) - return - elif line.startswith('+CMTI'): - # New SMS message indication - self._handleSmsReceived(line) - return - elif line.startswith('+CUSD'): - # USSD notification - either a response or a MT-USSD ("push USSD") message - self._handleUssd(lines) - return - elif line.startswith('+CDSI'): - # SMS status report - self._handleSmsStatusReport(line) - return - elif line.startswith('+CDS'): - # SMS status report at next line - next_line_is_te_statusreport = True - cdsMatch = self.CDS_REGEX.match(line) - if cdsMatch: - next_line_is_te_statusreport_length = int(cdsMatch.group(1)) - else: - next_line_is_te_statusreport_length = -1 - elif next_line_is_te_statusreport: - self._handleSmsStatusReportTe(next_line_is_te_statusreport_length, line) - return - elif line.startswith('+DTMF'): - # New incoming DTMF - self._handleIncomingDTMF(line) - return - else: - # Check for call status updates - for updateRegex, handlerFunc in self._callStatusUpdates: - match = updateRegex.match(line) - if match: - # Handle the update - handlerFunc(match) - return - # If this is reached, the notification wasn't handled - self.log.debug('Unhandled unsolicited modem notification: %s', lines) - - #Simcom modem able detect incoming DTMF - def _handleIncomingDTMF(self,line): - self.log.debug('Handling incoming DTMF') - - try: - dtmf_num=line.split(':')[1].replace(" ","") - self.dtmfpool.append(dtmf_num) - self.log.debug('DTMF number is {0}'.format(dtmf_num)) - except: - self.log.debug('Error parse DTMF number on line {0}'.format(line)) - def GetIncomingDTMF(self): - if (len(self.dtmfpool)==0): - return None - else: - return self.dtmfpool.pop(0) - - def _handleIncomingCall(self, lines): - self.log.debug('Handling incoming call') - ringLine = lines.pop(0) - if self._extendedIncomingCallIndication: - try: - callType = ringLine.split(' ', 1)[1] - except IndexError: - # Some external 3G scripts modify incoming call indication settings (issue #18) - self.log.debug('Extended incoming call indication format changed externally; re-enabling...') - callType = None - try: - # Re-enable extended format of incoming indication (optional) - self.write('AT+CRC=1') - except CommandError: - self.log.warning('Extended incoming call indication format changed externally; unable to re-enable') - self._extendedIncomingCallIndication = False - else: - callType = None - if self._callingLineIdentification and len(lines) > 0: - clipLine = lines.pop(0) - clipMatch = self.CLIP_REGEX.match(clipLine) - if clipMatch: - callerNumber = '+' + clipMatch.group(1) - ton = clipMatch.group(2) - #TODO: re-add support for this - callerName = None - #callerName = clipMatch.group(3) - #if callerName != None and len(callerName) == 0: - # callerName = None - else: - callerNumber = ton = callerName = None - else: - callerNumber = ton = callerName = None - - call = None - for activeCall in dictValuesIter(self.activeCalls): - if activeCall.number == callerNumber: - call = activeCall - call.ringCount += 1 - if call == None: - callId = len(self.activeCalls) + 1 - call = IncomingCall(self, callerNumber, ton, callerName, callId, callType) - self.activeCalls[callId] = call - self.incomingCallCallback(call) - - def _handleCallInitiated(self, regexMatch, callId=None, callType=1): - """ Handler for "outgoing call initiated" event notification line """ - if self._dialEvent: - if regexMatch: - groups = regexMatch.groups() - # Set self._dialReponse to (callId, callType) - if len(groups) >= 2: - self._dialResponse = (int(groups[0]) , int(groups[1])) - else: - self._dialResponse = (int(groups[0]), 1) # assume call type: VOICE - else: - self._dialResponse = callId, callType - self._dialEvent.set() - - def _handleCallAnswered(self, regexMatch, callId=None): - """ Handler for "outgoing call answered" event notification line """ - if regexMatch: - groups = regexMatch.groups() - if len(groups) > 1: - callId = int(groups[0]) - self.activeCalls[callId].answered = True - else: - # Call ID not available for this notificition - check for the first outgoing call that has not been answered - for call in dictValuesIter(self.activeCalls): - if call.answered == False and type(call) == Call: - call.answered = True - return - else: - # Use supplied values - self.activeCalls[callId].answered = True - - def _handleCallEnded(self, regexMatch, callId=None, filterUnanswered=False): - if regexMatch: - groups = regexMatch.groups() - if len(groups) > 0: - callId = int(groups[0]) - else: - # Call ID not available for this notification - check for the first outgoing call that is active - for call in dictValuesIter(self.activeCalls): - if type(call) == Call: - if not filterUnanswered or (filterUnanswered == True and call.answered == False): - callId = call.id - break - if callId and callId in self.activeCalls: - self.activeCalls[callId].answered = False - self.activeCalls[callId].active = False - del self.activeCalls[callId] - - def _handleCallRejected(self, regexMatch, callId=None): - """ Handler for rejected (unanswered calls being ended) - - Most modems use _handleCallEnded for handling both call rejections and remote hangups. - This method does the same, but filters for unanswered calls only. - """ - return self._handleCallEnded(regexMatch, callId, True) - - def _handleSmsReceived(self, notificationLine): - """ Handler for "new SMS" unsolicited notification line """ - self.log.info('SMS message received:'+str(notificationLine)) - if self.smsReceivedCallback is not None: - cmtiMatch = self.CMTI_REGEX.match(notificationLine) - if cmtiMatch: - msgMemory = cmtiMatch.group(1) - msgIndex = cmtiMatch.group(2) - sms = self.readStoredSms(msgIndex, msgMemory) - try: - self.smsReceivedCallback(sms) - except Exception: - self.log.error('error in smsReceivedCallback', exc_info=True) - else: - self.deleteStoredSms(msgIndex) - - def _handleSmsStatusReport(self, notificationLine): - """ Handler for SMS status reports """ - self.log.debug('SMS status report received') - cdsiMatch = self.CDSI_REGEX.match(notificationLine) - if cdsiMatch: - msgMemory = cdsiMatch.group(1) - msgIndex = cdsiMatch.group(2) - report = self.readStoredSms(msgIndex, msgMemory) - self.deleteStoredSms(msgIndex) - # Update sent SMS status if possible - if report.reference in self.sentSms: - self.sentSms[report.reference].report = report - if self._smsStatusReportEvent: - # A sendSms() call is waiting for this response - notify waiting thread - self._smsStatusReportEvent.set() - elif self.smsStatusReportCallback: - # Nothing is waiting for this report directly - use callback - try: - self.smsStatusReportCallback(report) - except Exception: - self.log.error('error in smsStatusReportCallback', exc_info=True) - - def _handleSmsStatusReportTe(self, length, notificationLine): - """ Handler for TE SMS status reports """ - self.log.info('TE SMS status report received') - try: - smsDict = decodeSmsPdu(notificationLine) - except EncodingError: - self.log.debug('Discarding notification line from +CDS response: %s', notificationLine) - else: - if smsDict['type'] == 'SMS-STATUS-REPORT': - report = StatusReport(self, int(smsDict['status']), smsDict['reference'], smsDict['number'], smsDict['time'], smsDict['discharge'], smsDict['status']) - else: - raise CommandError('Invalid PDU type for readStoredSms(): {0}'.format(smsDict['type'])) - # Update sent SMS status if possible - if report.reference in self.sentSms: - self.sentSms[report.reference].report = report - if self._smsStatusReportEvent: - # A sendSms() call is waiting for this response - notify waiting thread - self._smsStatusReportEvent.set() - else: - # Nothing is waiting for this report directly - use callback - try: - self.smsStatusReportCallback(report) - except Exception: - self.log.error('error in smsStatusReportCallback', exc_info=True) - - def readStoredSms(self, index, memory=None): - """ Reads and returns the SMS message at the specified index - - :param index: The index of the SMS message in the specified memory - :type index: int - :param memory: The memory type to read from. If None, use the current default SMS read memory - :type memory: str or None - - :raise CommandError: if unable to read the stored message - - :return: The SMS message - :rtype: subclass of gsmmodem.modem.Sms (either ReceivedSms or StatusReport) - """ - # Switch to the correct memory type if required - self._setSmsMemory(readDelete=memory) - msgData = self.write('AT+CMGR={0}'.format(index)) - # Parse meta information - if self.smsTextMode: - cmgrMatch = self.CMGR_SM_DELIVER_REGEX_TEXT.match(msgData[0]) - if cmgrMatch: - msgStatus, number, msgTime = cmgrMatch.groups() - msgText = '\n'.join(msgData[1:-1]) - return ReceivedSms(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], number, parseTextModeTimeStr(msgTime), msgText) - else: - # Try parsing status report - cmgrMatch = self.CMGR_SM_REPORT_REGEXT_TEXT.match(msgData[0]) - if cmgrMatch: - msgStatus, reference, number, sentTime, deliverTime, deliverStatus = cmgrMatch.groups() - if msgStatus.startswith('"'): - msgStatus = msgStatus[1:-1] - if len(msgStatus) == 0: - msgStatus = "REC UNREAD" - return StatusReport(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], int(reference), number, parseTextModeTimeStr(sentTime), parseTextModeTimeStr(deliverTime), int(deliverStatus)) - else: - raise CommandError('Failed to parse text-mode SMS message +CMGR response: {0}'.format(msgData)) - else: - cmgrMatch = self.CMGR_REGEX_PDU.match(msgData[0]) - if not cmgrMatch: - raise CommandError('Failed to parse PDU-mode SMS message +CMGR response: {0}'.format(msgData)) - stat, alpha, length = cmgrMatch.groups() - try: - stat = int(stat) - except Exception: - # Some modems (ZTE) do not always read return status - default to RECEIVED UNREAD - stat = Sms.STATUS_RECEIVED_UNREAD - pdu = msgData[1] - smsDict = decodeSmsPdu(pdu) - if smsDict['type'] == 'SMS-DELIVER': - return ReceivedSms(self, int(stat), smsDict['number'], smsDict['time'], smsDict['text'], smsDict['smsc'], smsDict.get('udh', [])) - elif smsDict['type'] == 'SMS-STATUS-REPORT': - return StatusReport(self, int(stat), smsDict['reference'], smsDict['number'], smsDict['time'], smsDict['discharge'], smsDict['status']) - else: - raise CommandError('Invalid PDU type for readStoredSms(): {0}'.format(smsDict['type'])) - - def deleteStoredSms(self, index, memory=None): - """ Deletes the SMS message stored at the specified index in modem/SIM card memory - - :param index: The index of the SMS message in the specified memory - :type index: int - :param memory: The memory type to delete from. If None, use the current default SMS read/delete memory - :type memory: str or None - - :raise CommandError: if unable to delete the stored message - """ - self._setSmsMemory(readDelete=memory) - self.write('AT+CMGD={0},0'.format(index)) - # TODO: make a check how many params are supported by the modem and use the right command. For example, Siemens MC35, TC35 take only one parameter. - #self.write('AT+CMGD={0}'.format(index)) - - def deleteMultipleStoredSms(self, delFlag=4, memory=None): - """ Deletes all SMS messages that have the specified read status. - - The messages are read from the memory set by the "memory" parameter. - The value of the "delFlag" paramater is the same as the "DelFlag" parameter of the +CMGD command: - 1: Delete All READ messages - 2: Delete All READ and SENT messages - 3: Delete All READ, SENT and UNSENT messages - 4: Delete All messages (this is the default) - - :param delFlag: Controls what type of messages to delete; see description above. - :type delFlag: int - :param memory: The memory type to delete from. If None, use the current default SMS read/delete memory - :type memory: str or None - :param delete: If True, delete returned messages from the device/SIM card - :type delete: bool - - :raise ValueErrror: if "delFlag" is not in range [1,4] - :raise CommandError: if unable to delete the stored messages - """ - if 0 < delFlag <= 4: - self._setSmsMemory(readDelete=memory) - self.write('AT+CMGD=1,{0}'.format(delFlag)) - else: - raise ValueError('"delFlag" must be in range [1,4]') - - def _handleUssd(self, lines): - """ Handler for USSD event notification line(s) """ - if self._ussdSessionEvent: - # A sendUssd() call is waiting for this response - parse it - self._ussdResponse = self._parseCusdResponse(lines) - # Notify waiting thread - self._ussdSessionEvent.set() - - def _parseCusdResponse(self, lines): - """ Parses one or more +CUSD notification lines (for USSD) - :return: USSD response object - :rtype: gsmmodem.modem.Ussd - """ - if len(lines) > 1: - # Issue #20: Some modem/network combinations use \r\n as in-message EOL indicators; - # - join lines to compensate for that (thanks to davidjb for the fix) - # Also, look for more than one +CUSD response because of certain modems' strange behaviour - cusdMatches = list(self.CUSD_REGEX.finditer('\r\n'.join(lines))) - else: - # Single standard +CUSD response - cusdMatches = [self.CUSD_REGEX.match(lines[0])] - message = None - sessionActive = True - if len(cusdMatches) > 1: - self.log.debug('Multiple +CUSD responses received; filtering...') - # Some modems issue a non-standard "extra" +CUSD notification for releasing the session - for cusdMatch in cusdMatches: - if cusdMatch.group(1) == '2': - # Set the session to inactive, but ignore the message - self.log.debug('Ignoring "session release" message: %s', cusdMatch.group(2)) - sessionActive = False - else: - # Not a "session release" message - message = cusdMatch.group(2) - if sessionActive and cusdMatch.group(1) != '1': - sessionActive = False - else: - sessionActive = cusdMatches[0].group(1) == '1' - message = cusdMatches[0].group(2) - return Ussd(self, sessionActive, message) - - def _placeHolderCallback(self, *args): - """ Does nothing """ - self.log.debug('called with args: {0}'.format(args)) - - def _pollCallStatus(self, expectedState, callId=None, timeout=None): - """ Poll the status of outgoing calls. - This is used for modems that do not have a known set of call status update notifications. - - :param expectedState: The internal state we are waiting for. 0 == initiated, 1 == answered, 2 = hangup - :type expectedState: int - - :raise TimeoutException: If a timeout was specified, and has occurred - """ - callDone = False - timeLeft = timeout or 999999 - while self.alive and not callDone and timeLeft > 0: - time.sleep(0.5) - if expectedState == 0: # Only call initializing can timeout - timeLeft -= 0.5 - try: - clcc = self._pollCallStatusRegex.match(self.write('AT+CLCC')[0]) - except TimeoutException as timeout: - # Can happend if the call was ended during our time.sleep() call - clcc = None - if clcc: - direction = int(clcc.group(2)) - if direction == 0: # Outgoing call - # Determine call state - stat = int(clcc.group(3)) - if expectedState == 0: # waiting for call initiated - if stat == 2 or stat == 3: # Dialing or ringing ("alerting") - callId = int(clcc.group(1)) - callType = int(clcc.group(4)) - self._handleCallInitiated(None, callId, callType) # if self_dialEvent is None, this does nothing - expectedState = 1 # Now wait for call answer - elif expectedState == 1: # waiting for call to be answered - if stat == 0: # Call active - callId = int(clcc.group(1)) - self._handleCallAnswered(None, callId) - expectedState = 2 # Now wait for call hangup - elif expectedState == 2 : # waiting for remote hangup - # Since there was no +CLCC response, the call is no longer active - callDone = True - self._handleCallEnded(None, callId=callId) - elif expectedState == 1: # waiting for call to be answered - # Call was rejected - callDone = True - self._handleCallRejected(None, callId=callId) - if timeLeft <= 0: - raise TimeoutException() - -class Ussd(object): - """ Unstructured Supplementary Service Data (USSD) message. - - This class contains convenient methods for replying to a USSD prompt - and to cancel the USSD session - """ - - def __init__(self, gsmModem, sessionActive, message): - self._gsmModem = weakref.proxy(gsmModem) - # Indicates if the session is active (True) or has been closed (False) - self.sessionActive = sessionActive - self.message = message - - def reply(self, message): - """ Sends a reply to this USSD message in the same USSD session - - :raise InvalidStateException: if the USSD session is not active (i.e. it has ended) - - :return: The USSD response message/session (as a Ussd object) - """ - if self.sessionActive: - return self._gsmModem.sendUssd(message) - else: - raise InvalidStateException('USSD session is inactive') - - def cancel(self): - """ Terminates/cancels the USSD session (without sending a reply) - - Does nothing if the USSD session is inactive. - """ - if self.sessionActive: - self._gsmModem.write('AT+CUSD=2') diff --git a/gsmmodem/pdu.py b/gsmmodem/pdu.py deleted file mode 100644 index 167ce2f..0000000 --- a/gsmmodem/pdu.py +++ /dev/null @@ -1,953 +0,0 @@ -# -*- coding: utf8 -*- - -""" SMS PDU encoding methods """ - -from __future__ import unicode_literals - -import sys, codecs -from datetime import datetime, timedelta, tzinfo -from copy import copy -from .exceptions import EncodingError - -# For Python 3 support -PYTHON_VERSION = sys.version_info[0] -if PYTHON_VERSION >= 3: - MAX_INT = sys.maxsize - dictItemsIter = dict.items - xrange = range - unichr = chr - toByteArray = lambda x: bytearray(codecs.decode(x, 'hex_codec')) if type(x) == bytes else bytearray(codecs.decode(bytes(x, 'ascii'), 'hex_codec')) if type(x) == str else x - rawStrToByteArray = lambda x: bytearray(bytes(x, 'latin-1')) - -TEXT_MODE = ('\n\r !\"#%&\'()*+,-./0123456789:;<=>?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz') # TODO: Check if all of them are supported inside text mode -# Tables can be found at: http://en.wikipedia.org/wiki/GSM_03.38#GSM_7_bit_default_alphabet_and_extension_table_of_3GPP_TS_23.038_.2F_GSM_03.38 -GSM7_BASIC = ('@£$¥èéùìòÇ\nØø\rÅåΔ_ΦΓΛΩΠΨΣΘΞ\x1bÆæßÉ !\"#¤%&\'()*+,-./0123456789:;<=>?¡ABCDEFGHIJKLMNOPQRSTUVWXYZÄÖÑÜ`¿abcdefghijklmnopqrstuvwxyzäöñüà') -GSM7_EXTENDED = {chr(0xFF): 0x0A, - #CR2: chr(0x0D), - '^': chr(0x14), - #SS2: chr(0x1B), - '{': chr(0x28), - '}': chr(0x29), - '\\': chr(0x2F), - '[': chr(0x3C), - '~': chr(0x3D), - ']': chr(0x3E), - '|': chr(0x40), - '€': chr(0x65)} -# Maximum message sizes for each data coding -MAX_MESSAGE_LENGTH = {0x00: 160, # GSM-7 - 0x04: 140, # 8-bit - 0x08: 70} # UCS2 - -# Maximum message sizes for each data coding for multipart messages -MAX_MULTIPART_MESSAGE_LENGTH = {0x00: 153, # GSM-7 - 0x04: 133, # 8-bit TODO: Check this value! - 0x08: 67} # UCS2 - -class SmsPduTzInfo(tzinfo): - """ Simple implementation of datetime.tzinfo for handling timestamp GMT offsets specified in SMS PDUs """ - - def __init__(self, pduOffsetStr=None): - """ - :param pduOffset: 2 semi-octet timezone offset as specified by PDU (see GSM 03.40 spec) - :type pduOffset: str - - Note: pduOffsetStr is optional in this constructor due to the special requirement for pickling - mentioned in the Python docs. It should, however, be used (or otherwise pduOffsetStr must be - manually set) - """ - self._offset = None - if pduOffsetStr != None: - self._setPduOffsetStr(pduOffsetStr) - - def _setPduOffsetStr(self, pduOffsetStr): - # See if the timezone difference is positive/negative by checking MSB of first semi-octet - tzHexVal = int(pduOffsetStr, 16) - # In order to read time zone 'minute' shift: - # - Remove MSB (sign) - # - Read HEX value as decimal - # - Multiply by 15 - # See: https://en.wikipedia.org/wiki/GSM_03.40#Time_Format - - # Possible fix for #15 - convert invalid character to BCD-value - if (tzHexVal & 0x0F) > 0x9: - tzHexVal +=0x06 - - tzOffsetMinutes = int('{0:0>2X}'.format(tzHexVal & 0x7F)) * 15 - - if tzHexVal & 0x80 == 0: # positive - self._offset = timedelta(minutes=(tzOffsetMinutes)) - else: # negative - self._offset = timedelta(minutes=(-tzOffsetMinutes)) - - def utcoffset(self, dt): - return self._offset - - def dst(self, dt): - """ We do not have enough info in the SMS PDU to implement daylight savings time """ - return timedelta(0) - - -class InformationElement(object): - """ User Data Header (UDH) Information Element (IE) implementation - - This represents a single field ("information element") in the PDU's - User Data Header. The UDH itself contains one or more of these - information elements. - - If the IEI (IE identifier) is recognized, the class will automatically - specialize into one of the subclasses of InformationElement, - e.g. Concatenation or PortAddress, allowing the user to easily - access the specific (and useful) attributes of these special cases. - """ - - def __new__(cls, *args, **kwargs): #iei, ieLen, ieData): - """ Causes a new InformationElement class, or subclass - thereof, to be created. If the IEI is recognized, a specific - subclass of InformationElement is returned """ - if len(args) > 0: - targetClass = IEI_CLASS_MAP.get(args[0], cls) - elif 'iei' in kwargs: - targetClass = IEI_CLASS_MAP.get(kwargs['iei'], cls) - else: - return super(InformationElement, cls).__new__(cls) - return super(InformationElement, targetClass).__new__(targetClass) - - def __init__(self, iei, ieLen=0, ieData=None): - self.id = iei # IEI - self.dataLength = ieLen # IE Length - self.data = ieData or [] # raw IE data - - @classmethod - def decode(cls, byteIter): - """ Decodes a single IE at the current position in the specified - byte iterator - - :return: An InformationElement (or subclass) instance for the decoded IE - :rtype: InformationElement, or subclass thereof - """ - iei = next(byteIter) - ieLen = next(byteIter) - ieData = [] - for i in xrange(ieLen): - ieData.append(next(byteIter)) - return InformationElement(iei, ieLen, ieData) - - def encode(self): - """ Encodes this IE and returns the resulting bytes """ - result = bytearray() - result.append(self.id) - result.append(self.dataLength) - result.extend(self.data) - return result - - def __len__(self): - """ Exposes the IE's total length (including the IEI and IE length octet) in octets """ - return self.dataLength + 2 - - -class Concatenation(InformationElement): - """ IE that indicates SMS concatenation. - - This implementation handles both 8-bit and 16-bit concatenation - indication, and exposes the specific useful details of this - IE as instance variables. - - Exposes: - - reference - CSMS reference number, must be same for all the SMS parts in the CSMS - parts - total number of parts. The value shall remain constant for every short - message which makes up the concatenated short message. If the value is zero then - the receiving entity shall ignore the whole information element - number - this part's number in the sequence. The value shall start at 1 and - increment for every short message which makes up the concatenated short message - """ - - def __init__(self, iei=0x00, ieLen=0, ieData=None): - super(Concatenation, self).__init__(iei, ieLen, ieData) - if ieData != None: - if iei == 0x00: # 8-bit reference - self.reference, self.parts, self.number = ieData - else: # 0x08: 16-bit reference - self.reference = ieData[0] << 8 | ieData[1] - self.parts = ieData[2] - self.number = ieData[3] - - def encode(self): - if self.reference > 0xFF: - self.id = 0x08 # 16-bit reference - self.data = [self.reference >> 8, self.reference & 0xFF, self.parts, self.number] - else: - self.id = 0x00 # 8-bit reference - self.data = [self.reference, self.parts, self.number] - self.dataLength = len(self.data) - return super(Concatenation, self).encode() - - -class PortAddress(InformationElement): - """ IE that indicates an Application Port Addressing Scheme. - - This implementation handles both 8-bit and 16-bit concatenation - indication, and exposes the specific useful details of this - IE as instance variables. - - Exposes: - destination: The destination port number - source: The source port number - """ - - def __init__(self, iei=0x04, ieLen=0, ieData=None): - super(PortAddress, self).__init__(iei, ieLen, ieData) - if ieData != None: - if iei == 0x04: # 8-bit port addressing scheme - self.destination, self.source = ieData - else: # 0x05: 16-bit port addressing scheme - self.destination = ieData[0] << 8 | ieData[1] - self.source = ieData[2] << 8 | ieData[3] - - def encode(self): - if self.destination > 0xFF or self.source > 0xFF: - self.id = 0x05 # 16-bit - self.data = [self.destination >> 8, self.destination & 0xFF, self.source >> 8, self.source & 0xFF] - else: - self.id = 0x04 # 8-bit - self.data = [self.destination, self.source] - self.dataLength = len(self.data) - return super(PortAddress, self).encode() - - -# Map of recognized IEIs -IEI_CLASS_MAP = {0x00: Concatenation, # Concatenated short messages, 8-bit reference number - 0x08: Concatenation, # Concatenated short messages, 16-bit reference number - 0x04: PortAddress, # Application port addressing scheme, 8 bit address - 0x05: PortAddress # Application port addressing scheme, 16 bit address - } - - -class Pdu(object): - """ Encoded SMS PDU. Contains raw PDU data and related meta-information """ - - def __init__(self, data, tpduLength): - """ Constructor - :param data: the raw PDU data (as bytes) - :type data: bytearray - :param tpduLength: Length (in bytes) of the TPDU - :type tpduLength: int - """ - self.data = data - self.tpduLength = tpduLength - - def __str__(self): - global PYTHON_VERSION - if PYTHON_VERSION < 3: - return str(self.data).encode('hex').upper() - else: #pragma: no cover - return str(codecs.encode(self.data, 'hex_codec'), 'ascii').upper() - - -def encodeSmsSubmitPdu(number, text, reference=0, validity=None, smsc=None, requestStatusReport=True, rejectDuplicates=False, sendFlash=False): - """ Creates an SMS-SUBMIT PDU for sending a message with the specified text to the specified number - - :param number: the destination mobile number - :type number: str - :param text: the message text - :type text: str - :param reference: message reference number (see also: rejectDuplicates parameter) - :type reference: int - :param validity: message validity period (absolute or relative) - :type validity: datetime.timedelta (relative) or datetime.datetime (absolute) - :param smsc: SMSC number to use (leave None to use default) - :type smsc: str - :param rejectDuplicates: Flag that controls the TP-RD parameter (messages with same destination and reference may be rejected if True) - :type rejectDuplicates: bool - - :return: A list of one or more tuples containing the SMS PDU (as a bytearray, and the length of the TPDU part - :rtype: list of tuples - """ - if PYTHON_VERSION < 3: - if type(text) == str: - text = text.decode('UTF-8') - - tpduFirstOctet = 0x01 # SMS-SUBMIT PDU - if validity != None: - # Validity period format (TP-VPF) is stored in bits 4,3 of the first TPDU octet - if type(validity) == timedelta: - # Relative (TP-VP is integer) - tpduFirstOctet |= 0x10 # bit4 == 1, bit3 == 0 - validityPeriod = [_encodeRelativeValidityPeriod(validity)] - elif type(validity) == datetime: - # Absolute (TP-VP is semi-octet encoded date) - tpduFirstOctet |= 0x18 # bit4 == 1, bit3 == 1 - validityPeriod = _encodeTimestamp(validity) - else: - raise TypeError('"validity" must be of type datetime.timedelta (for relative value) or datetime.datetime (for absolute value)') - else: - validityPeriod = None - if rejectDuplicates: - tpduFirstOctet |= 0x04 # bit2 == 1 - if requestStatusReport: - tpduFirstOctet |= 0x20 # bit5 == 1 - - # Encode message text and set data coding scheme based on text contents - try: - encodedTextLength = len(encodeGsm7(text)) - except ValueError: - # Cannot encode text using GSM-7; use UCS2 instead - encodedTextLength = len(text) - alphabet = 0x08 # UCS2 - else: - alphabet = 0x00 # GSM-7 - - # Check if message should be concatenated - if encodedTextLength > MAX_MESSAGE_LENGTH[alphabet]: - # Text too long for single PDU - add "concatenation" User Data Header - concatHeaderPrototype = Concatenation() - concatHeaderPrototype.reference = reference - - # Devide whole text into parts - if alphabet == 0x00: - pduTextParts = divideTextGsm7(text) - elif alphabet == 0x08: - pduTextParts = divideTextUcs2(text) - else: - raise NotImplementedError - - pduCount = len(pduTextParts) - concatHeaderPrototype.parts = pduCount - tpduFirstOctet |= 0x40 - else: - concatHeaderPrototype = None - pduCount = 1 - - # Construct required PDU(s) - pdus = [] - for i in xrange(pduCount): - pdu = bytearray() - if smsc: - pdu.extend(_encodeAddressField(smsc, smscField=True)) - else: - pdu.append(0x00) # Don't supply an SMSC number - use the one configured in the device - - udh = bytearray() - if concatHeaderPrototype != None: - concatHeader = copy(concatHeaderPrototype) - concatHeader.number = i + 1 - pduText = pduTextParts[i] - pduTextLength = len(pduText) - udh.extend(concatHeader.encode()) - else: - pduText = text - - udhLen = len(udh) - - pdu.append(tpduFirstOctet) - pdu.append(reference) # message reference - # Add destination number - pdu.extend(_encodeAddressField(number)) - pdu.append(0x00) # Protocol identifier - no higher-level protocol - - pdu.append(alphabet if not sendFlash else (0x10 if alphabet == 0x00 else 0x18)) - if validityPeriod: - pdu.extend(validityPeriod) - - if alphabet == 0x00: # GSM-7 - encodedText = encodeGsm7(pduText) - userDataLength = len(encodedText) # Payload size in septets/characters - if udhLen > 0: - shift = ((udhLen + 1) * 8) % 7 # "fill bits" needed to make the UDH end on a septet boundary - userData = packSeptets(encodedText, padBits=shift) - if shift > 0: - userDataLength += 1 # take padding bits into account - else: - userData = packSeptets(encodedText) - elif alphabet == 0x08: # UCS2 - userData = encodeUcs2(pduText) - userDataLength = len(userData) - - if udhLen > 0: - userDataLength += udhLen + 1 # +1 for the UDH length indicator byte - pdu.append(userDataLength) - pdu.append(udhLen) - pdu.extend(udh) # UDH - else: - pdu.append(userDataLength) - pdu.extend(userData) # User Data (message payload) - tpdu_length = len(pdu) - 1 - pdus.append(Pdu(pdu, tpdu_length)) - return pdus - -def decodeSmsPdu(pdu): - """ Decodes SMS pdu data and returns a tuple in format (number, text) - - :param pdu: PDU data as a hex string, or a bytearray containing PDU octects - :type pdu: str or bytearray - - :raise EncodingError: If the specified PDU data cannot be decoded - - :return: The decoded SMS data as a dictionary - :rtype: dict - """ - try: - pdu = toByteArray(pdu) - except Exception as e: - # Python 2 raises TypeError, Python 3 raises binascii.Error - raise EncodingError(e) - result = {} - pduIter = iter(pdu) - - smscNumber, smscBytesRead = _decodeAddressField(pduIter, smscField=True) - result['smsc'] = smscNumber - result['tpdu_length'] = len(pdu) - smscBytesRead - - tpduFirstOctet = next(pduIter) - - pduType = tpduFirstOctet & 0x03 # bits 1-0 - if pduType == 0x00: # SMS-DELIVER or SMS-DELIVER REPORT - result['type'] = 'SMS-DELIVER' - result['number'] = _decodeAddressField(pduIter)[0] - result['protocol_id'] = next(pduIter) - dataCoding = _decodeDataCoding(next(pduIter)) - result['time'] = _decodeTimestamp(pduIter) - userDataLen = next(pduIter) - udhPresent = (tpduFirstOctet & 0x40) != 0 - ud = _decodeUserData(pduIter, userDataLen, dataCoding, udhPresent) - result.update(ud) - elif pduType == 0x01: # SMS-SUBMIT or SMS-SUBMIT-REPORT - result['type'] = 'SMS-SUBMIT' - result['reference'] = next(pduIter) # message reference - we don't really use this - result['number'] = _decodeAddressField(pduIter)[0] - result['protocol_id'] = next(pduIter) - dataCoding = _decodeDataCoding(next(pduIter)) - validityPeriodFormat = (tpduFirstOctet & 0x18) >> 3 # bits 4,3 - if validityPeriodFormat == 0x02: # TP-VP field present and integer represented (relative) - result['validity'] = _decodeRelativeValidityPeriod(next(pduIter)) - elif validityPeriodFormat == 0x03: # TP-VP field present and semi-octet represented (absolute) - result['validity'] = _decodeTimestamp(pduIter) - userDataLen = next(pduIter) - udhPresent = (tpduFirstOctet & 0x40) != 0 - ud = _decodeUserData(pduIter, userDataLen, dataCoding, udhPresent) - result.update(ud) - elif pduType == 0x02: # SMS-STATUS-REPORT or SMS-COMMAND - result['type'] = 'SMS-STATUS-REPORT' - result['reference'] = next(pduIter) - result['number'] = _decodeAddressField(pduIter)[0] - result['time'] = _decodeTimestamp(pduIter) - result['discharge'] = _decodeTimestamp(pduIter) - result['status'] = next(pduIter) - else: - raise EncodingError('Unknown SMS message type: {0}. First TPDU octet was: {1}'.format(pduType, tpduFirstOctet)) - - return result - -def _decodeUserData(byteIter, userDataLen, dataCoding, udhPresent): - """ Decodes PDU user data (UDHI (if present) and message text) """ - result = {} - if udhPresent: - # User Data Header is present - result['udh'] = [] - udhLen = next(byteIter) - ieLenRead = 0 - # Parse and store UDH fields - while ieLenRead < udhLen: - ie = InformationElement.decode(byteIter) - ieLenRead += len(ie) - result['udh'].append(ie) - del ieLenRead - if dataCoding == 0x00: # GSM-7 - # Since we are using 7-bit data, "fill bits" may have been added to make the UDH end on a septet boundary - shift = ((udhLen + 1) * 8) % 7 # "fill bits" needed to make the UDH end on a septet boundary - # Simulate another "shift" in the unpackSeptets algorithm in order to ignore the fill bits - prevOctet = next(byteIter) - shift += 1 - - if dataCoding == 0x00: # GSM-7 - if udhPresent: - userDataSeptets = unpackSeptets(byteIter, userDataLen, prevOctet, shift) - else: - userDataSeptets = unpackSeptets(byteIter, userDataLen) - result['text'] = decodeGsm7(userDataSeptets) - elif dataCoding == 0x02: # UCS2 - result['text'] = decodeUcs2(byteIter, userDataLen) - else: # 8-bit (data) - userData = [] - for b in byteIter: - userData.append(unichr(b)) - result['text'] = ''.join(userData) - return result - -def _decodeRelativeValidityPeriod(tpVp): - """ Calculates the relative SMS validity period (based on the table in section 9.2.3.12 of GSM 03.40) - :rtype: datetime.timedelta - """ - if tpVp <= 143: - return timedelta(minutes=((tpVp + 1) * 5)) - elif 144 <= tpVp <= 167: - return timedelta(hours=12, minutes=((tpVp - 143) * 30)) - elif 168 <= tpVp <= 196: - return timedelta(days=(tpVp - 166)) - elif 197 <= tpVp <= 255: - return timedelta(weeks=(tpVp - 192)) - else: - raise ValueError('tpVp must be in range [0, 255]') - -def _encodeRelativeValidityPeriod(validityPeriod): - """ Encodes the specified relative validity period timedelta into an integer for use in an SMS PDU - (based on the table in section 9.2.3.12 of GSM 03.40) - - :param validityPeriod: The validity period to encode - :type validityPeriod: datetime.timedelta - :rtype: int - """ - # Python 2.6 does not have timedelta.total_seconds(), so compute it manually - #seconds = validityPeriod.total_seconds() - seconds = validityPeriod.seconds + (validityPeriod.days * 24 * 3600) - if seconds <= 43200: # 12 hours - tpVp = int(seconds / 300) - 1 # divide by 5 minutes, subtract 1 - elif seconds <= 86400: # 24 hours - tpVp = int((seconds - 43200) / 1800) + 143 # subtract 12 hours, divide by 30 minutes. add 143 - elif validityPeriod.days <= 30: # 30 days - tpVp = validityPeriod.days + 166 # amount of days + 166 - elif validityPeriod.days <= 441: # max value of tpVp is 255 - tpVp = int(validityPeriod.days / 7) + 192 # amount of weeks + 192 - else: - raise ValueError('Validity period too long; tpVp limited to 1 octet (max value: 255)') - return tpVp - -def _decodeTimestamp(byteIter): - """ Decodes a 7-octet timestamp """ - dateStr = decodeSemiOctets(byteIter, 7) - timeZoneStr = dateStr[-2:] - return datetime.strptime(dateStr[:-2], '%y%m%d%H%M%S').replace(tzinfo=SmsPduTzInfo(timeZoneStr)) - -def _encodeTimestamp(timestamp): - """ Encodes a 7-octet timestamp from the specified date - - Note: the specified timestamp must have a UTC offset set; you can use gsmmodem.util.SimpleOffsetTzInfo for simple cases - - :param timestamp: The timestamp to encode - :type timestamp: datetime.datetime - - :return: The encoded timestamp - :rtype: bytearray - """ - if timestamp.tzinfo == None: - raise ValueError('Please specify time zone information for the timestamp (e.g. by using gsmmodem.util.SimpleOffsetTzInfo)') - - # See if the timezone difference is positive/negative - tzDelta = timestamp.utcoffset() - if tzDelta.days >= 0: - tzValStr = '{0:0>2}'.format(int(tzDelta.seconds / 60 / 15)) - else: # negative - tzVal = int((tzDelta.days * -3600 * 24 - tzDelta.seconds) / 60 / 15) # calculate offset in 0.25 hours - # Cast as literal hex value and set MSB of first semi-octet of timezone to 1 to indicate negative value - tzVal = int('{0:0>2}'.format(tzVal), 16) | 0x80 - tzValStr = '{0:0>2X}'.format(tzVal) - - dateStr = timestamp.strftime('%y%m%d%H%M%S') + tzValStr - - return encodeSemiOctets(dateStr) - -def _decodeDataCoding(octet): - if octet & 0xC0 == 0: - #compressed = octect & 0x20 - alphabet = (octet & 0x0C) >> 2 - return alphabet # 0x00 == GSM-7, 0x01 == 8-bit data, 0x02 == UCS2 - # We ignore other coding groups - return 0 - -def nibble2octet(addressLen): - return int((addressLen + 1) / 2) - -def _decodeAddressField(byteIter, smscField=False, log=False): - """ Decodes the address field at the current position of the bytearray iterator - - :param byteIter: Iterator over bytearray - :type byteIter: iter(bytearray) - - :return: Tuple containing the address value and amount of bytes read (value is or None if it is empty (zero-length)) - :rtype: tuple - """ - addressLen = next(byteIter) - if addressLen > 0: - toa = next(byteIter) - ton = (toa & 0x70) # bits 6,5,4 of type-of-address == type-of-number - if ton == 0x50: - # Alphanumberic number - addressLen = nibble2octet(addressLen) - septets = unpackSeptets(byteIter, addressLen) - addressValue = decodeGsm7(septets) - return (addressValue, (addressLen + 2)) - else: - # ton == 0x00: Unknown (might be international, local, etc) - leave as is - # ton == 0x20: National number - if smscField: - addressValue = decodeSemiOctets(byteIter, addressLen-1) - else: - addressLen = nibble2octet(addressLen) - addressValue = decodeSemiOctets(byteIter, addressLen) - addressLen += 1 # for the return value, add the toa byte - if ton == 0x10: # International number - addressValue = '+' + addressValue - return (addressValue, (addressLen + 1)) - else: - return (None, 1) - -def _encodeAddressField(address, smscField=False): - """ Encodes the address into an address field - - :param address: The address to encode (phone number or alphanumeric) - :type byteIter: str - - :return: Encoded SMS PDU address field - :rtype: bytearray - """ - # First, see if this is a number or an alphanumeric string - toa = 0x80 | 0x00 | 0x01 # Type-of-address start | Unknown type-of-number | ISDN/tel numbering plan - alphaNumeric = False - if address.isalnum(): - # Might just be a local number - if address.isdigit(): - # Local number - toa |= 0x20 - else: - # Alphanumeric address - toa |= 0x50 - toa &= 0xFE # switch to "unknown" numbering plan - alphaNumeric = True - else: - if address[0] == '+' and address[1:].isdigit(): - # International number - toa |= 0x10 - # Remove the '+' prefix - address = address[1:] - else: - # Alphanumeric address - toa |= 0x50 - toa &= 0xFE # switch to "unknown" numbering plan - alphaNumeric = True - if alphaNumeric: - addressValue = packSeptets(encodeGsm7(address, False)) - addressLen = len(addressValue) * 2 - else: - addressValue = encodeSemiOctets(address) - if smscField: - addressLen = len(addressValue) + 1 - else: - addressLen = len(address) - result = bytearray() - result.append(addressLen) - result.append(toa) - result.extend(addressValue) - return result - -def encodeSemiOctets(number): - """ Semi-octet encoding algorithm (e.g. for phone numbers) - - :return: bytearray containing the encoded octets - :rtype: bytearray - """ - if len(number) % 2 == 1: - number = number + 'F' # append the "end" indicator - octets = [int(number[i+1] + number[i], 16) for i in xrange(0, len(number), 2)] - return bytearray(octets) - -def decodeSemiOctets(encodedNumber, numberOfOctets=None): - """ Semi-octet decoding algorithm(e.g. for phone numbers) - - :param encodedNumber: The semi-octet-encoded telephone number (in bytearray format or hex string) - :type encodedNumber: bytearray, str or iter(bytearray) - :param numberOfOctets: The expected amount of octets after decoding (i.e. when to stop) - :type numberOfOctets: int - - :return: decoded telephone number - :rtype: string - """ - number = [] - if type(encodedNumber) in (str, bytes): - encodedNumber = bytearray(codecs.decode(encodedNumber, 'hex_codec')) - i = 0 - for octet in encodedNumber: - hexVal = hex(octet)[2:].zfill(2) - number.append(hexVal[1]) - if hexVal[0] != 'f': - number.append(hexVal[0]) - else: - break - if numberOfOctets != None: - i += 1 - if i == numberOfOctets: - break - return ''.join(number) - -def encodeTextMode(plaintext): - """ Text mode checker - - Tests whther SMS could be sent in text mode - - :param text: the text string to encode - - :raise ValueError: if the text string cannot be sent in text mode - - :return: Passed string - :rtype: str - """ - if PYTHON_VERSION >= 3: - plaintext = str(plaintext) - elif type(plaintext) == str: - plaintext = plaintext.decode('UTF-8') - - for char in plaintext: - idx = TEXT_MODE.find(char) - if idx != -1: - continue - else: - raise ValueError('Cannot encode char "{0}" inside text mode'.format(char)) - - if len(plaintext) > MAX_MESSAGE_LENGTH[0x00]: - raise ValueError('Message is too long for text mode (maximum {0} characters)'.format(MAX_MESSAGE_LENGTH[0x00])) - - return plaintext - -def encodeGsm7(plaintext, discardInvalid=False): - """ GSM-7 text encoding algorithm - - Encodes the specified text string into GSM-7 octets (characters). This method does not pack - the characters into septets. - - :param text: the text string to encode - :param discardInvalid: if True, characters that cannot be encoded will be silently discarded - - :raise ValueError: if the text string cannot be encoded using GSM-7 encoding (unless discardInvalid == True) - - :return: A bytearray containing the string encoded in GSM-7 encoding - :rtype: bytearray - """ - result = bytearray() - if PYTHON_VERSION >= 3: - plaintext = str(plaintext) - elif type(plaintext) == str: - plaintext = plaintext.decode('UTF-8') - - for char in plaintext: - idx = GSM7_BASIC.find(char) - if idx != -1: - result.append(idx) - elif char in GSM7_EXTENDED: - result.append(0x1B) # ESC - switch to extended table - result.append(ord(GSM7_EXTENDED[char])) - elif not discardInvalid: - raise ValueError('Cannot encode char "{0}" using GSM-7 encoding'.format(char)) - return result - -def decodeGsm7(encodedText): - """ GSM-7 text decoding algorithm - - Decodes the specified GSM-7-encoded string into a plaintext string. - - :param encodedText: the text string to encode - :type encodedText: bytearray or str - - :return: A string containing the decoded text - :rtype: str - """ - result = [] - if type(encodedText) == str: - encodedText = rawStrToByteArray(encodedText) #bytearray(encodedText) - iterEncoded = iter(encodedText) - for b in iterEncoded: - if b == 0x1B: # ESC - switch to extended table - c = chr(next(iterEncoded)) - for char, value in dictItemsIter(GSM7_EXTENDED): - if c == value: - result.append(char) - break - else: - result.append(GSM7_BASIC[b]) - return ''.join(result) - -def divideTextGsm7(plainText): - """ GSM7 message dividing algorithm - - Divides text into list of chunks that could be stored in a single, GSM7-encoded SMS message. - - :param plainText: the text string to divide - :type plainText: str - - :return: A list of strings - :rtype: list of str - """ - result = [] - - plainStartPtr = 0 - plainStopPtr = 0 - chunkByteSize = 0 - - if PYTHON_VERSION >= 3: - plainText = str(plainText) - while plainStopPtr < len(plainText): - char = plainText[plainStopPtr] - idx = GSM7_BASIC.find(char) - if idx != -1: - chunkByteSize = chunkByteSize + 1; - elif char in GSM7_EXTENDED: - chunkByteSize = chunkByteSize + 2; - else: - raise ValueError('Cannot encode char "{0}" using GSM-7 encoding'.format(char)) - - plainStopPtr = plainStopPtr + 1 - if chunkByteSize > MAX_MULTIPART_MESSAGE_LENGTH[0x00]: - plainStopPtr = plainStopPtr - 1 - - if chunkByteSize >= MAX_MULTIPART_MESSAGE_LENGTH[0x00]: - result.append(plainText[plainStartPtr:plainStopPtr]) - plainStartPtr = plainStopPtr - chunkByteSize = 0 - - if chunkByteSize > 0: - result.append(plainText[plainStartPtr:]) - - return result - -def packSeptets(octets, padBits=0): - """ Packs the specified octets into septets - - Typically the output of encodeGsm7 would be used as input to this function. The resulting - bytearray contains the original GSM-7 characters packed into septets ready for transmission. - - :rtype: bytearray - """ - result = bytearray() - if type(octets) == str: - octets = iter(rawStrToByteArray(octets)) - elif type(octets) == bytearray: - octets = iter(octets) - shift = padBits - if padBits == 0: - try: - prevSeptet = next(octets) - except StopIteration: - return result - else: - prevSeptet = 0x00 - for octet in octets: - septet = octet & 0x7f; - if shift == 7: - # prevSeptet has already been fully added to result - shift = 0 - prevSeptet = septet - continue - b = ((septet << (7 - shift)) & 0xFF) | (prevSeptet >> shift) - prevSeptet = septet - shift += 1 - result.append(b) - if shift != 7: - # There is a bit "left over" from prevSeptet - result.append(prevSeptet >> shift) - return result - -def unpackSeptets(septets, numberOfSeptets=None, prevOctet=None, shift=7): - """ Unpacks the specified septets into octets - - :param septets: Iterator or iterable containing the septets packed into octets - :type septets: iter(bytearray), bytearray or str - :param numberOfSeptets: The amount of septets to unpack (or None for all remaining in "septets") - :type numberOfSeptets: int or None - - :return: The septets unpacked into octets - :rtype: bytearray - """ - result = bytearray() - if type(septets) == str: - septets = iter(rawStrToByteArray(septets)) - elif type(septets) == bytearray: - septets = iter(septets) - if numberOfSeptets == None: - numberOfSeptets = MAX_INT # Loop until StopIteration - if numberOfSeptets == 0: - return result - i = 0 - for octet in septets: - i += 1 - if shift == 7: - shift = 1 - if prevOctet != None: - result.append(prevOctet >> 1) - if i <= numberOfSeptets: - result.append(octet & 0x7F) - prevOctet = octet - if i == numberOfSeptets: - break - else: - continue - b = ((octet << shift) & 0x7F) | (prevOctet >> (8 - shift)) - - prevOctet = octet - result.append(b) - shift += 1 - - if i == numberOfSeptets: - break - if shift == 7 and prevOctet: - b = prevOctet >> (8 - shift) - if b: - # The final septet value still needs to be unpacked - result.append(b) - return result - -def decodeUcs2(byteIter, numBytes): - """ Decodes UCS2-encoded text from the specified byte iterator, up to a maximum of numBytes """ - userData = [] - i = 0 - try: - while i < numBytes: - userData.append(unichr((next(byteIter) << 8) | next(byteIter))) - i += 2 - except StopIteration: - # Not enough bytes in iterator to reach numBytes; return what we have - pass - return ''.join(userData) - -def encodeUcs2(text): - """ UCS2 text encoding algorithm - - Encodes the specified text string into UCS2-encoded bytes. - - :param text: the text string to encode - - :return: A bytearray containing the string encoded in UCS2 encoding - :rtype: bytearray - """ - result = bytearray() - - for b in map(ord, text): - result.append(b >> 8) - result.append(b & 0xFF) - return result - -def divideTextUcs2(plainText): - """ UCS-2 message dividing algorithm - - Divides text into list of chunks that could be stored in a single, UCS-2 -encoded SMS message. - - :param plainText: the text string to divide - :type plainText: str - - :return: A list of strings - :rtype: list of str - """ - result = [] - resultLength = 0 - - fullChunksCount = int(len(plainText) / MAX_MULTIPART_MESSAGE_LENGTH[0x08]) - for i in range(fullChunksCount): - result.append(plainText[i * MAX_MULTIPART_MESSAGE_LENGTH[0x08] : (i + 1) * MAX_MULTIPART_MESSAGE_LENGTH[0x08]]) - resultLength = resultLength + MAX_MULTIPART_MESSAGE_LENGTH[0x08] - - # Add last, not fully filled chunk - if resultLength < len(plainText): - result.append(plainText[resultLength:]) - - return result diff --git a/gsmmodem/serial_comms.py b/gsmmodem/serial_comms.py deleted file mode 100644 index 44544fe..0000000 --- a/gsmmodem/serial_comms.py +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env python - -""" Low-level serial communications handling """ - -import sys, threading, logging - -import re -import serial # pyserial: http://pyserial.sourceforge.net - -from .exceptions import TimeoutException -from . import compat # For Python 2.6 compatibility - - -class SerialComms(object): - """ Wraps all low-level serial communications (actual read/write operations) """ - - log = logging.getLogger('gsmmodem.serial_comms.SerialComms') - - # End-of-line read terminator - RX_EOL_SEQ = b'\r\n' - # End-of-response terminator - RESPONSE_TERM = re.compile('^OK|ERROR|(\+CM[ES] ERROR: \d+)|(COMMAND NOT SUPPORT)$') - # Default timeout for serial port reads (in seconds) - timeout = 1 - - def __init__(self, port, baudrate=115200, notifyCallbackFunc=None, fatalErrorCallbackFunc=None, *args, **kwargs): - """ Constructor - - :param fatalErrorCallbackFunc: function to call if a fatal error occurs in the serial device reading thread - :type fatalErrorCallbackFunc: func - """ - self.alive = False - self.port = port - self.baudrate = baudrate - - self._responseEvent = None # threading.Event() - self._expectResponseTermSeq = None # expected response terminator sequence - self._response = None # Buffer containing response to a written command - self._notification = [] # Buffer containing lines from an unsolicited notification from the modem - # Reentrant lock for managing concurrent write access to the underlying serial port - self._txLock = threading.RLock() - - self.notifyCallback = notifyCallbackFunc or self._placeholderCallback - self.fatalErrorCallback = fatalErrorCallbackFunc or self._placeholderCallback - - self.com_args = args - self.com_kwargs = kwargs - - def connect(self): - """ Connects to the device and starts the read thread """ - self.serial = serial.Serial(dsrdtr=True, rtscts=True, port=self.port, baudrate=self.baudrate, - timeout=self.timeout, *self.com_args, **self.com_kwargs) - # Start read thread - self.alive = True - self.rxThread = threading.Thread(target=self._readLoop) - self.rxThread.daemon = True - self.rxThread.start() - - def close(self): - """ Stops the read thread, waits for it to exit cleanly, then closes the underlying serial port """ - self.alive = False - self.rxThread.join() - self.serial.close() - - def _handleLineRead(self, line, checkForResponseTerm=True): - # print 'sc.hlineread:',line - if self._responseEvent and not self._responseEvent.is_set(): - # A response event has been set up (another thread is waiting for this response) - self._response.append(line) - if not checkForResponseTerm or self.RESPONSE_TERM.match(line): - # End of response reached; notify waiting thread - # print 'response:', self._response - self.log.debug('response: %s', self._response) - self._responseEvent.set() - else: - # Nothing was waiting for this - treat it as a notification - self._notification.append(line) - if self.serial.inWaiting() == 0: - # No more chars on the way for this notification - notify higher-level callback - # print 'notification:', self._notification - self.log.debug('notification: %s', self._notification) - self.notifyCallback(self._notification) - self._notification = [] - - def _placeholderCallback(self, *args, **kwargs): - """ Placeholder callback function (does nothing) """ - - def _readLoop(self): - """ Read thread main loop - - Reads lines from the connected device - """ - try: - readTermSeq = bytearray(self.RX_EOL_SEQ) - readTermLen = len(readTermSeq) - rxBuffer = bytearray() - while self.alive: - data = self.serial.read(1) - if len(data) != 0: # check for timeout - # print >> sys.stderr, ' RX:', data,'({0})'.format(ord(data)) - rxBuffer.append(ord(data)) - if rxBuffer[-readTermLen:] == readTermSeq: - # A line (or other logical segment) has been read - line = rxBuffer[:-readTermLen].decode() - rxBuffer = bytearray() - if len(line) > 0: - # print 'calling handler' - self._handleLineRead(line) - elif self._expectResponseTermSeq: - if rxBuffer[-len(self._expectResponseTermSeq):] == self._expectResponseTermSeq: - line = rxBuffer.decode() - rxBuffer = bytearray() - self._handleLineRead(line, checkForResponseTerm=False) - # else: - # ' ' - except serial.SerialException as e: - self.alive = False - try: - self.serial.close() - except Exception: # pragma: no cover - pass - # Notify the fatal error handler - self.fatalErrorCallback(e) - - def write(self, data, waitForResponse=True, timeout=5, expectedResponseTermSeq=None): - data = data.encode() - with self._txLock: - if waitForResponse: - if expectedResponseTermSeq: - self._expectResponseTermSeq = bytearray(expectedResponseTermSeq.encode()) - self._response = [] - self._responseEvent = threading.Event() - self.serial.write(data) - if self._responseEvent.wait(timeout): - self._responseEvent = None - self._expectResponseTermSeq = False - return self._response - else: # Response timed out - self._responseEvent = None - self._expectResponseTermSeq = False - if len(self._response) > 0: - # Add the partial response to the timeout exception - raise TimeoutException(self._response) - else: - raise TimeoutException() - else: - self.serial.write(data) diff --git a/gsmmodem/util.py b/gsmmodem/util.py deleted file mode 100644 index a61ba8d..0000000 --- a/gsmmodem/util.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" Some common utility classes used by tests """ - -from datetime import datetime, timedelta, tzinfo -import re - - -class SimpleOffsetTzInfo(tzinfo): - """ Very simple implementation of datetime.tzinfo offering set timezone offset for datetime instances """ - - def __init__(self, offsetInHours=None): - """ Constructs a new tzinfo instance using an amount of hours as an offset - - :param offsetInHours: The timezone offset, in hours (may be negative) - :type offsetInHours: int or float - """ - if offsetInHours != None: # pragma: no cover - self.offsetInHours = offsetInHours - - def utcoffset(self, dt): - return timedelta(hours=self.offsetInHours) - - def dst(self, dt): - return timedelta(0) - - def __repr__(self): - return 'gsmmodem.util.SimpleOffsetTzInfo({0})'.format(self.offsetInHours) - - -def parseTextModeTimeStr(timeStr): - """ Parses the specified SMS text mode time string - - The time stamp format is "yy/MM/dd,hh:mm:ss±zz" - (yy = year, MM = month, dd = day, hh = hour, mm = minute, ss = second, zz = time zone - [Note: the unit of time zone is a quarter of an hour]) - - :param timeStr: The time string to parse - :type timeStr: str - - :return: datetime object representing the specified time string - :rtype: datetime.datetime - """ - msgTime = timeStr[:-3] - tzOffsetHours = int(int(timeStr[-3:]) * 0.25) - return datetime.strptime(msgTime, '%y/%m/%d,%H:%M:%S').replace(tzinfo=SimpleOffsetTzInfo(tzOffsetHours)) - - -def lineStartingWith(string, lines): - """ Searches through the specified list of strings and returns the - first line starting with the specified search string, or None if not found - """ - for line in lines: - if line.startswith(string): - return line - else: - return None - - -def lineMatching(regexStr, lines): - """ Searches through the specified list of strings and returns the regular expression - match for the first line that matches the specified regex string, or None if no match was found - - Note: if you have a pre-compiled regex pattern, use lineMatchingPattern() instead - - :type regexStr: Regular expression string to use - :type lines: List of lines to search - - :return: the regular expression match for the first line that matches the specified regex, or None if no match was found - :rtype: re.Match - """ - regex = re.compile(regexStr) - for line in lines: - m = regex.match(line) - if m: - return m - else: - return None - - -def lineMatchingPattern(pattern, lines): - """ Searches through the specified list of strings and returns the regular expression - match for the first line that matches the specified pre-compiled regex pattern, or None if no match was found - - Note: if you are using a regex pattern string (i.e. not already compiled), use lineMatching() instead - - :type pattern: Compiled regular expression pattern to use - :type lines: List of lines to search - - :return: the regular expression match for the first line that matches the specified regex, or None if no match was found - :rtype: re.Match - """ - for line in lines: - m = pattern.match(line) - if m: - return m - else: - return None - - -def allLinesMatchingPattern(pattern, lines): - """ Like lineMatchingPattern, but returns all lines that match the specified pattern - - :type pattern: Compiled regular expression pattern to use - :type lines: List of lines to search - - :return: list of re.Match objects for each line matched, or an empty list if none matched - :rtype: list - """ - result = [] - for line in lines: - m = pattern.match(line) - if m: - result.append(m) - return result - - -def removeAtPrefix(string): - """ Remove AT prefix from a specified string. - - :param string: An original string - :type string: str - - :return: A string with AT prefix removed - :rtype: str - """ - if string.startswith('AT'): - return string[2:] - return string diff --git a/logs/AppLogging.py b/logs/AppLogging.py index a9f9f49..ac9be75 100644 --- a/logs/AppLogging.py +++ b/logs/AppLogging.py @@ -8,4 +8,4 @@ def init_logger(): filemode='a', format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%D:%H:%M:%S', - level=logging.INFO) + level=logging.DEBUG) diff --git a/modems/ModemPool.py b/modems/ModemPool.py index 54685a6..f921baf 100644 --- a/modems/ModemPool.py +++ b/modems/ModemPool.py @@ -2,6 +2,7 @@ import re import time import serial +from gsmmodem import GsmModem from serial import Serial from definitions import BAUDRATE @@ -9,7 +10,9 @@ from error.SIMError import SIMError from logs.LogSender import LOG_APPOINTMENT_SUCCESS, SUBJECT_SIM_INFO from params import firebase_store_manager, oracle_log_sender from pojo.SimInfoPojo import SimInfoPojo +from pojo.serial_modem import SerialModem from utils.excel_reader import ExcelHelper +from utils.operator import check_operator, Operator class ModemPool: @@ -43,42 +46,55 @@ class ModemPool: return msg def get_raw_phone_number(self, slot_position): + for index, ser in enumerate(self._serial_list): - print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port)) - if not self._select_sim_storage(ser): - print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR)) - continue - msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser) - if "Unfortunately" in str(msg): - print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED)) - continue - elif "CME ERROR" in str(msg): - print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR)) - continue - elif len(msg) == 0: - print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT)) - continue - # find phone number - match = re.search(r'33\d{9}', str(msg)) - phone_number = match.group(0) - print("phone is " + phone_number) + sim_position = index + 1 + position = (slot_position - 1) * len(self._port_list) + sim_position + # unlock sim + unlock_cmd = 'AT+CPIN="{0}\r"'.format("0000") + self._send_command(unlock_cmd, ser, 10) cmd = "AT+CCID\r" response = str(self._send_command(cmd, ser)) ccid_group = re.search("[0-9F]+", response) ccid = ccid_group.group(0) - sim_position = index + 1 - position = (slot_position - 1) * len(self._port_list) + sim_position - if phone_number: - self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid, position=position)) - self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO, - type=LOG_APPOINTMENT_SUCCESS) - # write the number to sim card's phonebook - cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r' - self._send_command(cmd, ser, wait_time_in_s=2) - self.get_own_number(ser) + operator = check_operator(ccid) + if operator == Operator.SFR or operator == Operator.CHINA_TELECOM: + contacts = self._excel_helper.read_contacts() + contact = [contact for contact in contacts if + contact.ccid.replace("F", "") == ccid.replace("F", "")] + if len(contact) > 0: + phone_number = contact[0].phone + self._db_manager.save_sim_info(SimInfoPojo(phone=str(phone_number), ccid=ccid, position=position)) + else: + error_msg = "slot({}),sim({})".format(slot_position, sim_position) + oracle_log_sender.send_contact_not_found(error_msg) + else: + print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port)) + if not self._select_sim_storage(ser): + print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR)) + continue + msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser) + if "Unfortunately" in str(msg): + print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED)) + continue + elif "CME ERROR" in str(msg): + print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR)) + continue + elif len(msg) == 0: + print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT)) + continue + # find phone number + match = re.search(r'33\d{9}', str(msg)) + phone_number = match.group(0) + print("phone is " + phone_number) - def get_own_number(self, ser: Serial): - print("saved phone number: " + str(self._send_command(f'AT+CPBR={self.phone_number_position}\r', ser))) + if phone_number: + self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid, position=position)) + self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO, + type=LOG_APPOINTMENT_SUCCESS) + # write the number to sim card's phonebook + cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r' + self._send_command(cmd, ser, wait_time_in_s=2) def _select_sim_storage(self, ser) -> bool: # use SIM Card storage diff --git a/reset_all_sim_card.py b/reset_all_sim_card.py index 104f616..64be10d 100644 --- a/reset_all_sim_card.py +++ b/reset_all_sim_card.py @@ -1,15 +1,17 @@ import logging +import sys import params from modems.ModemPool import ModemPool from logs.AppLogging import init_logger from logs.LogSender import LOG_SUBJECT_EVENT, TYPE_EVENT_RESET_ALL_SIM_CARDS from main import card_pool, get_devices_ports +from wait_for_sms import start_waiting_sms def read_all_the_phone_number(): params.oracle_log_sender.send_log(msg="SIM卡自检开始", subject=LOG_SUBJECT_EVENT, type=TYPE_EVENT_RESET_ALL_SIM_CARDS) - slot_number = 16 + slot_number = 1 slot_sum = 27 params.firebase_store_manager.clear_all_sim_info() for i in range(slot_number, slot_sum + 1): @@ -24,4 +26,5 @@ def read_all_the_phone_number(): if __name__ == '__main__': init_logger() logger = logging.getLogger() + logger.addHandler(logging.StreamHandler(stream=sys.stdout)) read_all_the_phone_number() diff --git a/wait_for_sms.py b/wait_for_sms.py index 512aeef..58d494a 100644 --- a/wait_for_sms.py +++ b/wait_for_sms.py @@ -20,9 +20,6 @@ commandor = CommandorPage() thread_event = None current_gsm_modem = None card_pool = CardPool(CARD_POOL_PORT) -# used to save the current slot position -current_card_pool_slot = 1 -current_sim_position = 1 def get_devices_ports() -> list: @@ -49,9 +46,13 @@ def timeout_occurred(serial_modem: SerialModem): def start_to_handle_sms(serial_modem: SerialModem): global current_gsm_modem current_gsm_modem = serial_modem.modem - if check_operator(serial_modem.ccid) == Operator.LYCAMOBILE: + # if check_operator(serial_modem.ccid) == Operator.LYCAMOBILE: # lycamobile + try: current_gsm_modem.deleteMultipleStoredSms(memory="SM") + except Exception as error: + print(error) + serial_modem.modem.smsReceivedCallback = handle_sms serial_modem.modem.smsTextMode = False logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number)) @@ -84,9 +85,10 @@ def init_modems() -> list: return modems -def start_book(): - slot_number = 2 - slot_sum = 2 +def start_waiting_sms(): + # logger = logging.getLogger() + slot_number = 1 + slot_sum = 28 for i in range(slot_number, slot_sum + 1): card_pool.reset() logger.info("will switch to " + str(i)) @@ -97,10 +99,7 @@ def start_book(): # read the contact, and merge the 2 objects together excel_reader = ExcelHelper() contacts = excel_reader.read_contacts() - global current_sim_position - current_sim_position = 1 for modem in modem_list: - current_sim_position = current_sim_position + 1 try: # get contact for current modem modem.get_ccid() @@ -119,4 +118,4 @@ if __name__ == '__main__': init_logger() logger = logging.getLogger() logger.addHandler(logging.StreamHandler(stream=sys.stdout)) - start_book() + start_waiting_sms()