diff --git a/gsmmodem/__init__.py b/gsmmodem/__init__.py new file mode 100644 index 0000000..9b65886 --- /dev/null +++ b/gsmmodem/__init__.py @@ -0,0 +1,17 @@ +""" Package that allows easy control of an attached GSM modem + +The main class for controlling a modem is GsmModem, which can be imported +directly from this module. + +Other important and useful classes are: +gsmmodem.modem.IncomingCall: wraps an incoming call and passed to the incoming call hanndler callback function +gsmmodem.modem.ReceivedSms: wraps a received SMS message and passed to the sms received hanndler callback function +gsmmodem.modem.SentSms: returned when sending SMS messages; used for tracking the status of the SMS message + +All python-gsmmodem-specific exceptions are defined in the gsmmodem.modem.exceptions package. + +@author: Francois Aucamp +@license: LGPLv3+ +""" + +from .modem import GsmModem diff --git a/gsmmodem/compat.py b/gsmmodem/compat.py new file mode 100644 index 0000000..46d53ea --- /dev/null +++ b/gsmmodem/compat.py @@ -0,0 +1,22 @@ +""" Contains monkey-patched equivalents for a few commonly-used Python 2.7-and-higher functions. +Used to provide backwards-compatibility with Python 2.6 +""" +import sys +if sys.version_info[0] == 2 and sys.version_info[1] < 7: + import threading + + # threading.Event.wait() always returns None in Python < 2.7 so we need to patch it + if hasattr(threading, '_Event'): # threading.Event is a function that return threading._Event + # This is heavily Python-implementation-specific, so patch where we can, otherwise leave it + def wrapWait(func): + def newWait(self, timeout=None): + func(self, timeout) + return self.is_set() + return newWait + threading._Event.wait = wrapWait(threading._Event.wait) + else: + raise ImportError('Could not patch this version of Python 2.{0} for compatibility with python-gsmmodem.'.format(sys.version_info[1])) +if sys.version_info[0] == 2: + str = str +else: + str = lambda x: x \ No newline at end of file diff --git a/gsmmodem/exceptions.py b/gsmmodem/exceptions.py new file mode 100644 index 0000000..6882524 --- /dev/null +++ b/gsmmodem/exceptions.py @@ -0,0 +1,134 @@ +""" Module defines exceptions used by gsmmodem """ + +class GsmModemException(Exception): + """ Base exception raised for error conditions when interacting with the GSM modem """ + + +class TimeoutException(GsmModemException): + """ Raised when a write command times out """ + + def __init__(self, data=None): + """ @param data: Any data that was read was read before timeout occurred (if applicable) """ + super(TimeoutException, self).__init__(data) + self.data = data + + +class InvalidStateException(GsmModemException): + """ Raised when an API method call is invoked on an object that is in an incorrect state """ + + +class InterruptedException(InvalidStateException): + """ Raised when execution of an AT command is interrupt by a state change. + May contain another exception that was the cause of the interruption """ + + def __init__(self, message, cause=None): + """ @param cause: the exception that caused this interruption (usually a CmeError) """ + super(InterruptedException, self).__init__(message) + self.cause = cause + + +class CommandError(GsmModemException): + """ Raised if the modem returns an error in response to an AT command + + May optionally include an error type (CME or CMS) and -code (error-specific). + """ + + _description = '' + + def __init__(self, command=None, type=None, code=None): + self.command = command + self.type = type + self.code = code + if type != None and code != None: + super(CommandError, self).__init__('{0} {1}{2}'.format(type, code, ' ({0})'.format(self._description) if len(self._description) > 0 else '')) + elif command != None: + super(CommandError, self).__init__(command) + else: + super(CommandError, self).__init__() + + +class CmeError(CommandError): + """ ME error result code : +CME ERROR: + + Issued in response to an AT command + """ + + def __new__(cls, *args, **kwargs): + # Return a specialized version of this class if possible + if len(args) >= 2: + code = args[1] + if code == 11: + return PinRequiredError(args[0]) + elif code == 16: + return IncorrectPinError(args[0]) + elif code == 12: + return PukRequiredError(args[0]) + return super(CmeError, cls).__new__(cls, *args, **kwargs) + + def __init__(self, command, code): + super(CmeError, self).__init__(command, 'CME', code) + + +class SecurityException(CmeError): + """ Security-related CME error """ + + def __init__(self, command, code): + super(SecurityException, self).__init__(command, code) + + +class PinRequiredError(SecurityException): + """ Raised if an operation failed because the SIM card's PIN has not been entered """ + + _description = 'SIM card PIN is required' + + def __init__(self, command, code=11): + super(PinRequiredError, self).__init__(command, code) + + +class IncorrectPinError(SecurityException): + """ Raised if an incorrect PIN is entered """ + + _description = 'Incorrect PIN entered' + + def __init__(self, command, code=16): + super(IncorrectPinError, self).__init__(command, code) + + +class PukRequiredError(SecurityException): + """ Raised an operation failed because the SIM card's PUK is required (SIM locked) """ + + _description = "PUK required (SIM locked)" + + def __init__(self, command, code=12): + super(PukRequiredError, self).__init__(command, code) + + +class CmsError(CommandError): + """ Message service failure result code: +CMS ERROR : + + Issued in response to an AT command + """ + + def __new__(cls, *args, **kwargs): + # Return a specialized version of this class if possible + if len(args) >= 2: + code = args[1] + if code == 330: + return SmscNumberUnknownError(args[0]) + return super(CmsError, cls).__new__(cls, *args, **kwargs) + + def __init__(self, command, code): + super(CmsError, self).__init__(command, 'CMS', code) + + +class SmscNumberUnknownError(CmsError): + """ Raised if the SMSC (service centre) address is missing when trying to send an SMS message """ + + _description = 'SMSC number not set' + + def __init__(self, command, code=330): + super(SmscNumberUnknownError, self).__init__(command, code) + + +class EncodingError(GsmModemException): + """ Raised if a decoding- or encoding operation failed """ diff --git a/gsmmodem/gprs.py b/gsmmodem/gprs.py new file mode 100644 index 0000000..af164f4 --- /dev/null +++ b/gsmmodem/gprs.py @@ -0,0 +1,96 @@ +# -*- coding: utf8 -*- + +""" GPRS/Data-specific classes + +BRANCH: mms + +PLEASE NOTE: *Everything* in this file (PdpContext, GprsModem class, etc) is experimental. +This is NOT meant to be used in production in any way; the API is completely unstable, +no unit tests will be written for this in the forseeable future, and stuff may generally +break and cause riots. Please do not file bug reports against this branch unless you +have a patch to go along with it, but even then: remember that this entire "mms" branch +is exploratory; I simply want to see what the possibilities are with it. + +Use the "main" branch, and the GsmModem class if you want to build normal applications. +""" + +import re + +from .util import allLinesMatchingPattern +from .modem import GsmModem + +class PdpContext(object): + """ Packet Data Protocol (PDP) context parameter values """ + def __init__(self, cid, pdpType, apn, pdpAddress=None, dataCompression=0, headerCompression=0): + """ Construct a new Packet Data Protocol context + + @param cid: PDP Context Identifier - specifies a particular PDP context definition + @type cid: int + @param pdpType: the type of packet data protocol (IP, PPP, IPV6, etc) + @type pdpType: str + @param apn: Access Point Name; logical name used to select the GGSN or external packet data network + @type apn: str + @param pdpAddress: identifies the MT in the address space applicable to the PDP. If None, a dynamic address may be requested. + @type pdpAddress: str + @param dataCompression: PDP data compression; 0 == off, 1 == on + @type dataCompression: int + @param headerCompression: PDP header compression; 0 == off, 1 == on + @type headerCompression: int + """ + self.cid = cid + self.pdpType = pdpType + self.apn = apn + self.pdpAddress = pdpAddress + self.dataCompression = dataCompression + self.headerCompression = headerCompression + + +class GprsModem(GsmModem): + """ EXPERIMENTAL: Specialized version of GsmModem that includes GPRS/data-specific commands """ + + @property + def pdpContexts(self): + """ Currently-defined Packet Data Protocol (PDP) context list + + PDP paramter values returned include PDP type (IP, IPV6, PPP, X.25 etc), APN, + data compression, header compression, etc. + + @return: a list of currently-defined PDP contexts + """ + result = [] + cgdContResult = self.write('AT+CGDCONT?') + matches = allLinesMatchingPattern(re.compile(r'^\+CGDCONT:\s*(\d+),"([^"]+)","([^"]+)","([^"]+)",(\d+),(\d+)'), cgdContResult) + for cgdContMatch in matches: + cid, pdpType, apn, pdpAddress, dataCompression, headerCompression = cgdContMatch.groups() + pdpContext = PdpContext(cid, pdpType, apn, pdpAddress, dataCompression, headerCompression) + result.append(pdpContext) + return result + + @property + def defaultPdpContext(self): + """ @return: the default PDP context, or None if not defined """ + pdpContexts = self.pdpContexts + return pdpContexts[0] if len(pdpContexts) > 0 else None + @defaultPdpContext.setter + def defaultPdpContext(self, pdpContext): + """ Set the default PDP context (or clear it by setting it to None) """ + self.write('AT+CGDCONT=,"{0}","{1}","{2}",{3},{4}'.format(pdpContext.pdpType, pdpContext.apn, pdpContext.pdpAddress or '', pdpContext.dataCompression, pdpContext.headerCompression)) + + def definePdpContext(self, pdpContext): + """ Define a new Packet Data Protocol context, or overwrite an existing one + + @param pdpContext: The PDP context to define + @type pdpContext: gsmmodem.gprs.PdpContext + """ + self.write('AT+CGDCONT={0},"{1}","{2}","{3}",{4},{5}'.format(pdpContext.cid or '', pdpContext.pdpType, pdpContext.apn, pdpContext.pdpAddress or '', pdpContext.dataCompression, pdpContext.headerCompression)) + + def initDataConnection(self, pdpCid=1): + """ Initializes a packet data (GPRS) connection using the specified PDP Context ID """ + # From this point on, we don't want the read thread interfering + #self.log.debug('Stopping read thread') + #self.alive = False + #self.rxThread.join() + self.log.debug('Init data connection') + self.write('ATD*99#', expectedResponseTermSeq="CONNECT\r") + self.log.debug('Data connection open; ready for PPP comms') + # From here on we use PPP to communicate with the network diff --git a/gsmmodem/models/Call.py b/gsmmodem/models/Call.py new file mode 100644 index 0000000..757f9e2 --- /dev/null +++ b/gsmmodem/models/Call.py @@ -0,0 +1,80 @@ +import weakref + +from gsmmodem.exceptions import CmeError, InterruptedException, InvalidStateException + + +class Call(object): + """ A voice call """ + + DTMF_COMMAND_BASE = '+VTS=' + dtmfSupport = False # Indicates whether or not DTMF tones can be sent in calls + + def __init__(self, gsmModem, callId, callType, number, callStatusUpdateCallbackFunc=None): + """ + :param gsmModem: GsmModem instance that created this object + :param number: The number that is being called + """ + self._gsmModem = weakref.proxy(gsmModem) + self._callStatusUpdateCallbackFunc = callStatusUpdateCallbackFunc + # Unique ID of this call + self.id = callId + # Call type (VOICE == 0, etc) + self.type = callType + # The remote number of this call (destination or origin) + self.number = number + # Flag indicating whether the call has been answered or not (backing field for "answered" property) + self._answered = False + # Flag indicating whether or not the call is active + # (meaning it may be ringing or answered, but not ended because of a hangup event) + self.active = True + + @property + def answered(self): + return self._answered + + @answered.setter + def answered(self, answered): + self._answered = answered + if self._callStatusUpdateCallbackFunc: + self._callStatusUpdateCallbackFunc(self) + + def sendDtmfTone(self, tones): + """ Send one or more DTMF tones to the remote party (only allowed for an answered call) + + Note: this is highly device-dependent, and might not work + + :param digits: A str containining one or more DTMF tones to play, e.g. "3" or "\*123#" + + :raise CommandError: if the command failed/is not supported + :raise InvalidStateException: if the call has not been answered, or is ended while the command is still executing + """ + if self.answered: + dtmfCommandBase = self.DTMF_COMMAND_BASE.format(cid=self.id) + toneLen = len(tones) + for tone in list(tones): + try: + self._gsmModem.write('AT{0}{1}'.format(dtmfCommandBase, tone), timeout=(5 + toneLen)) + + except CmeError as e: + if e.code == 30: + # No network service - can happen if call is ended during DTMF transmission (but also if DTMF is sent immediately after call is answered) + raise InterruptedException('No network service', e) + elif e.code == 3: + # Operation not allowed - can happen if call is ended during DTMF transmission + raise InterruptedException('Operation not allowed', e) + else: + raise e + else: + raise InvalidStateException('Call is not active (it has not yet been answered, or it has ended).') + + def hangup(self): + """ End the phone call. + + Does nothing if the call is already inactive. + """ + if self.active: + self._gsmModem.write('ATH') + self.answered = False + self.active = False + if self.id in self._gsmModem.activeCalls: + del self._gsmModem.activeCalls[self.id] diff --git a/gsmmodem/models/IncomingCall.py b/gsmmodem/models/IncomingCall.py new file mode 100644 index 0000000..917ef9e --- /dev/null +++ b/gsmmodem/models/IncomingCall.py @@ -0,0 +1,40 @@ +from gsmmodem.modem import Call + + +class IncomingCall(Call): + + CALL_TYPE_MAP = {'VOICE': 0} + + """ Represents an incoming call, conveniently allowing access to call meta information and -control """ + def __init__(self, gsmModem, number, ton, callerName, callId, callType): + """ + :param gsmModem: GsmModem instance that created this object + :param number: Caller number + :param ton: TON (type of number/address) in integer format + :param callType: Type of the incoming call (VOICE, FAX, DATA, etc) + """ + if callType in self.CALL_TYPE_MAP: + callType = self.CALL_TYPE_MAP[callType] + super(IncomingCall, self).__init__(gsmModem, callId, callType, number) + # Type attribute of the incoming call + self.ton = ton + self.callerName = callerName + # Flag indicating whether the call is ringing or not + self.ringing = True + # Amount of times this call has rung (before answer/hangup) + self.ringCount = 1 + + def answer(self): + """ Answer the phone call. + :return: self (for chaining method calls) + """ + if self.ringing: + self._gsmModem.write('ATA') + self.ringing = False + self.answered = True + return self + + def hangup(self): + """ End the phone call. """ + self.ringing = False + super(IncomingCall, self).hangup() \ No newline at end of file diff --git a/gsmmodem/models/ReceivedSms.py b/gsmmodem/models/ReceivedSms.py new file mode 100644 index 0000000..60edd23 --- /dev/null +++ b/gsmmodem/models/ReceivedSms.py @@ -0,0 +1,27 @@ +import weakref + +from gsmmodem.models.Sms import Sms + + +class ReceivedSms(Sms): + """ An SMS message that has been received (MT) """ + + def __init__(self, gsmModem, status, number, time, text, smsc=None, udh=[], index=None): + super(ReceivedSms, self).__init__(number, text, smsc) + self._gsmModem = weakref.proxy(gsmModem) + self.status = status + self.time = time + self.udh = udh + self.index = index + + def reply(self, message): + """ Convenience method that sends a reply SMS to the sender of this message """ + return self._gsmModem.sendSms(self.number, message) + + def sendSms(self, dnumber, message): + """ Convenience method that sends a SMS to someone else """ + return self._gsmModem.sendSms(dnumber, message) + + def getModem(self): + """ Convenience method that returns the gsm modem instance """ + return self._gsmModem diff --git a/gsmmodem/models/SentSms.py b/gsmmodem/models/SentSms.py new file mode 100644 index 0000000..ff032c8 --- /dev/null +++ b/gsmmodem/models/SentSms.py @@ -0,0 +1,27 @@ +from gsmmodem.models.Sms import Sms +from gsmmodem.models.StatusReport import StatusReport + + +class SentSms(Sms): + """ An SMS message that has been sent (MO) """ + + ENROUTE = 0 # Status indicating message is still enroute to destination + DELIVERED = 1 # Status indicating message has been received by destination handset + FAILED = 2 # Status indicating message delivery has failed + + def __init__(self, number, text, reference, smsc=None): + super(SentSms, self).__init__(number, text, smsc) + self.report = None # Status report for this SMS (StatusReport object) + self.reference = reference + + @property + def status(self): + """ Status of this SMS. Can be ENROUTE, DELIVERED or FAILED + + The actual status report object may be accessed via the 'report' attribute + if status is 'DELIVERED' or 'FAILED' + """ + if self.report == None: + return SentSms.ENROUTE + else: + return SentSms.DELIVERED if self.report.deliveryStatus == StatusReport.DELIVERED else SentSms.FAILED diff --git a/gsmmodem/models/Sms.py b/gsmmodem/models/Sms.py new file mode 100644 index 0000000..fc7f711 --- /dev/null +++ b/gsmmodem/models/Sms.py @@ -0,0 +1,24 @@ +import abc + + +class Sms(object): + """ Abstract SMS message base class """ + __metaclass__ = abc.ABCMeta + + # Some constants to ease handling SMS statuses + STATUS_RECEIVED_UNREAD = 0 + STATUS_RECEIVED_READ = 1 + STATUS_STORED_UNSENT = 2 + STATUS_STORED_SENT = 3 + STATUS_ALL = 4 + # ...and a handy converter for text mode statuses + TEXT_MODE_STATUS_MAP = {'REC UNREAD': STATUS_RECEIVED_UNREAD, + 'REC READ': STATUS_RECEIVED_READ, + 'STO UNSENT': STATUS_STORED_UNSENT, + 'STO SENT': STATUS_STORED_SENT, + 'ALL': STATUS_ALL} + + def __init__(self, number, text, smsc=None): + self.number = number + self.text = text + self.smsc = smsc \ No newline at end of file diff --git a/gsmmodem/models/StatusReport.py b/gsmmodem/models/StatusReport.py new file mode 100644 index 0000000..b58a2d6 --- /dev/null +++ b/gsmmodem/models/StatusReport.py @@ -0,0 +1,24 @@ +import weakref + +from gsmmodem.models.Sms import Sms + + +class StatusReport(Sms): + """ An SMS status/delivery report + + Note: the 'status' attribute of this class refers to this status report SM's status (whether + it has been read, etc). To find the status of the message that caused this status report, + use the 'deliveryStatus' attribute. + """ + + DELIVERED = 0 # SMS delivery status: delivery successful + FAILED = 68 # SMS delivery status: delivery failed + + def __init__(self, gsmModem, status, reference, number, timeSent, timeFinalized, deliveryStatus, smsc=None): + super(StatusReport, self).__init__(number, None, smsc) + self._gsmModem = weakref.proxy(gsmModem) + self.status = status + self.reference = reference + self.timeSent = timeSent + self.timeFinalized = timeFinalized + self.deliveryStatus = deliveryStatus diff --git a/gsmmodem/models/__init__.py b/gsmmodem/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gsmmodem/modem.py b/gsmmodem/modem.py new file mode 100644 index 0000000..efacd14 --- /dev/null +++ b/gsmmodem/modem.py @@ -0,0 +1,1548 @@ +#!/usr/bin/env python + +""" High-level API classes for an attached GSM modem """ + +import logging +import re +import sys +import threading +import time +import weakref + +from gsmmodem.exceptions import EncodingError +from gsmmodem.util import lineMatching +from .exceptions import CommandError, InvalidStateException, CmeError, CmsError, TimeoutException, PinRequiredError, \ + SmscNumberUnknownError +from .models.Call import Call +from .models.IncomingCall import IncomingCall +from .models.ReceivedSms import ReceivedSms +from .models.SentSms import SentSms +from .models.Sms import Sms +from .models.StatusReport import StatusReport +from .pdu import encodeSmsSubmitPdu, decodeSmsPdu, encodeGsm7, encodeTextMode +from .serial_comms import SerialComms +from .util import lineStartingWith, parseTextModeTimeStr, removeAtPrefix + +PYTHON_VERSION = sys.version_info[0] + +CTRLZ = '\x1a' +TERMINATOR = '\r' + +if PYTHON_VERSION >= 3: + xrange = range + dictValuesIter = dict.values + dictItemsIter = dict.items + +class GsmModem(SerialComms): + """ Main class for interacting with an attached GSM modem """ + + log = logging.getLogger('gsmmodem.modem.GsmModem') + + # Used for parsing AT command errors + CM_ERROR_REGEX = re.compile('^\+(CM[ES]) ERROR: (\d+)$') + # Used for parsing signal strength query responses + CSQ_REGEX = re.compile('^\+CSQ:\s*(\d+),') + # Used for parsing caller ID announcements for incoming calls. Group 1 is the number + CLIP_REGEX = re.compile('^\+CLIP:\s*"\+{0,1}(\d+)",(\d+).*$') + # Used for parsing own number. Group 1 is the number + CNUM_REGEX = re.compile('^\+CNUM:\s*".*?","(\+{0,1}\d+)",(\d+).*$') + # Used for parsing new SMS message indications + CMTI_REGEX = re.compile('^\+CMTI:\s*"([^"]+)",\s*(\d+)$') + # Used for parsing SMS message reads (text mode) + CMGR_SM_DELIVER_REGEX_TEXT = None + # Used for parsing SMS status report message reads (text mode) + CMGR_SM_REPORT_REGEXT_TEXT = None + # Used for parsing SMS message reads (PDU mode) + CMGR_REGEX_PDU = None + # Used for parsing USSD event notifications + CUSD_REGEX = re.compile('\+CUSD:\s*(\d),\s*"(.*?)",\s*(\d+)', re.DOTALL) + # Used for parsing SMS status reports + CDSI_REGEX = re.compile('\+CDSI:\s*"([^"]+)",(\d+)$') + CDS_REGEX = re.compile('\+CDS:\s*([0-9]+)"$') + + def __init__(self, port, baudrate=115200, incomingCallCallbackFunc=None, smsReceivedCallbackFunc=None, smsStatusReportCallback=None, requestDelivery=True, AT_CNMI="", *a, **kw): + super(GsmModem, self).__init__(port, baudrate, notifyCallbackFunc=self._handleModemNotification, *a, **kw) + self.incomingCallCallback = incomingCallCallbackFunc or self._placeholderCallback + self.smsReceivedCallback = smsReceivedCallbackFunc or self._placeholderCallback + self.smsStatusReportCallback = smsStatusReportCallback or self._placeholderCallback + self.requestDelivery = requestDelivery + self.AT_CNMI = AT_CNMI or "2,1,0,2" + # Flag indicating whether caller ID for incoming call notification has been set up + self._callingLineIdentification = False + # Flag indicating whether incoming call notifications have extended information + self._extendedIncomingCallIndication = False + # Current active calls (ringing and/or answered), key is the unique call ID (not the remote number) + self.activeCalls = {} + # Dict containing sent SMS messages (for auto-tracking their delivery status) + self.sentSms = weakref.WeakValueDictionary() + self._ussdSessionEvent = None # threading.Event + self._ussdResponse = None # gsmmodem.modem.Ussd + self._smsStatusReportEvent = None # threading.Event + self._dialEvent = None # threading.Event + self._dialResponse = None # gsmmodem.modem.Call + self._waitForAtdResponse = True # Flag that controls if we should wait for an immediate response to ATD, or not + self._waitForCallInitUpdate = True # Flag that controls if we should wait for a ATD "call initiated" message + self._callStatusUpdates = [] # populated during connect() - contains regexes and handlers for detecting/handling call status updates + self._mustPollCallStatus = False # whether or not the modem must be polled for outgoing call status updates + self._pollCallStatusRegex = None # Regular expression used when polling outgoing call status + self._writeWait = 0 # Time (in seconds to wait after writing a command (adjusted when 515 errors are detected) + self._smsTextMode = False # Storage variable for the smsTextMode property + self._gsmBusy = 0 # Storage variable for the GSMBUSY property + self._smscNumber = None # Default SMSC number + self._smsRef = 0 # Sent SMS reference counter + self._smsMemReadDelete = None # Preferred message storage memory for reads/deletes ( parameter used for +CPMS) + self._smsMemWrite = None # Preferred message storage memory for writes ( parameter used for +CPMS) + self._smsReadSupported = True # Whether or not reading SMS messages is supported via AT commands + self._smsEncoding = 'GSM' # Default SMS encoding + self._smsSupportedEncodingNames = None # List of available encoding names + self._commands = None # List of supported AT commands + #Pool of detected DTMF + self.dtmfpool = [] + + def connect(self, pin=None, waitingForModemToStartInSeconds=0): + """ Opens the port and initializes the modem and SIM card + + :param pin: The SIM card PIN code, if any + :type pin: str + + :raise PinRequiredError: if the SIM card requires a PIN but none was provided + :raise IncorrectPinError: if the specified PIN is incorrect + """ + self.log.info('Connecting to modem on port %s at %dbps', self.port, self.baudrate) + super(GsmModem, self).connect() + + if waitingForModemToStartInSeconds > 0: + while waitingForModemToStartInSeconds > 0: + try: + self.write('AT', waitForResponse=True, timeout=0.5) + break + except TimeoutException: + waitingForModemToStartInSeconds -= 0.5 + + # Send some initialization commands to the modem + try: + self.write('ATZ') # reset configuration + except CommandError: + # Some modems require a SIM PIN at this stage already; unlock it now + # Attempt to enable detailed error messages (to catch incorrect PIN error) + # but ignore if it fails + self.write('AT+CMEE=1', parseError=False) + self._unlockSim(pin) + pinCheckComplete = True + self.write('ATZ') # reset configuration + else: + pinCheckComplete = False + self.write('ATE0') # echo off + try: + cfun = lineStartingWith('+CFUN:', self.write('AT+CFUN?'))[7:] # example response: +CFUN: 1 or +CFUN: 1,0 + cfun = int(cfun.split(",")[0]) + if cfun != 1: + self.write('AT+CFUN=1') + except CommandError: + pass # just ignore if the +CFUN command isn't supported + + self.write('AT+CMEE=1') # enable detailed error messages (even if it has already been set - ATZ may reset this) + if not pinCheckComplete: + self._unlockSim(pin) + + # Get list of supported commands from modem + commands = self.supportedCommands + self._commands = commands + + # Device-specific settings + callUpdateTableHint = 0 # unknown modem + enableWind = False + if commands != None: + if '^CVOICE' in commands: + self.write('AT^CVOICE=0', parseError=False) # Enable voice calls + if '+VTS' in commands: # Check for DTMF sending support + Call.dtmfSupport = True + elif '^DTMF' in commands: + # Huawei modems use ^DTMF to send DTMF tones + callUpdateTableHint = 1 # Huawei + if '^USSDMODE' in commands: + # Enable Huawei text-mode USSD + self.write('AT^USSDMODE=0', parseError=False) + if '+WIND' in commands: + callUpdateTableHint = 2 # Wavecom + enableWind = True + elif '+ZPAS' in commands: + callUpdateTableHint = 3 # ZTE + else: + # Try to enable general notifications on Wavecom-like device + enableWind = True + + if enableWind: + try: + wind = lineStartingWith('+WIND:', self.write('AT+WIND?')) # Check current WIND value; example response: +WIND: 63 + except CommandError: + # Modem does not support +WIND notifications. See if we can detect other known call update notifications + pass + else: + # Enable notifications for call setup, hangup, etc + # if int(wind[7:]) != 50: + # self.write('AT+WIND=50') + callUpdateTableHint = 2 # Wavecom + + # Attempt to identify modem type directly (if not already) - for outgoing call status updates + if callUpdateTableHint == 0: + if 'simcom' in self.manufacturer.lower() : #simcom modems support DTMF and don't support AT+CLAC + Call.dtmfSupport = True + try: + self.write('AT+DDET=1') # enable detect incoming DTMF + except CommandError: + # simcom 7000E for example doesn't support the DDET command + Call.dtmfSupport = False + + if self.manufacturer.lower() == 'huawei': + callUpdateTableHint = 1 # huawei + else: + # See if this is a ZTE modem that has not yet been identified based on supported commands + try: + self.write('AT+ZPAS?') + except CommandError: + pass # Not a ZTE modem + else: + callUpdateTableHint = 3 # ZTE + # Load outgoing call status updates based on identified modem features + if callUpdateTableHint == 1: + # Use Hauwei's ^NOTIFICATIONs + self.log.info('Loading Huawei call state update table') + self._callStatusUpdates = ((re.compile('^\^ORIG:(\d),(\d)$'), self._handleCallInitiated), + (re.compile('^\^CONN:(\d),(\d)$'), self._handleCallAnswered), + (re.compile('^\^CEND:(\d),(\d+),(\d)+,(\d)+$'), self._handleCallEnded)) + self._mustPollCallStatus = False + # Huawei modems use ^DTMF to send DTMF tones; use that instead + Call.DTMF_COMMAND_BASE = '^DTMF={cid},' + Call.dtmfSupport = True + elif callUpdateTableHint == 2: + # Wavecom modem: +WIND notifications supported + self.log.info('Loading Wavecom call state update table') + self._callStatusUpdates = ((re.compile('^\+WIND: 5,(\d)$'), self._handleCallInitiated), + (re.compile('^OK$'), self._handleCallAnswered), + (re.compile('^\+WIND: 6,(\d)$'), self._handleCallEnded)) + self._waitForAtdResponse = False # Wavecom modems return OK only when the call is answered + self._mustPollCallStatus = False + if commands == None: # older modem, assume it has standard DTMF support + Call.dtmfSupport = True + elif callUpdateTableHint == 3: # ZTE + # Use ZTE notifications ("CONNECT"/"HANGUP", but no "call initiated" notification) + self.log.info('Loading ZTE call state update table') + self._callStatusUpdates = ((re.compile('^CONNECT$'), self._handleCallAnswered), + (re.compile('^HANGUP:\s*(\d+)$'), self._handleCallEnded), + (re.compile('^OK$'), self._handleCallRejected)) + self._waitForAtdResponse = False # ZTE modems do not return an immediate OK only when the call is answered + self._mustPollCallStatus = False + self._waitForCallInitUpdate = False # ZTE modems do not provide "call initiated" updates + if commands == None: # ZTE uses standard +VTS for DTMF + Call.dtmfSupport = True + else: + # Unknown modem - we do not know what its call updates look like. Use polling instead + self.log.info('Unknown/generic modem type - will use polling for call state updates') + self._mustPollCallStatus = True + self._pollCallStatusRegex = re.compile('^\+CLCC:\s+(\d+),(\d),(\d),(\d),([^,]),"([^,]*)",(\d+)$') + self._waitForAtdResponse = True # Most modems return OK immediately after issuing ATD + + # General meta-information setup + self.write('AT+COPS=3,0', parseError=False) # Use long alphanumeric name format + + # SMS setup + self.write('AT+CMGF={0}'.format(1 if self.smsTextMode else 0)) # Switch to text or PDU mode for SMS messages + self._compileSmsRegexes() + if self._smscNumber != None: + self.write('AT+CSCA="{0}"'.format(self._smscNumber)) # Set default SMSC number + currentSmscNumber = self._smscNumber + else: + currentSmscNumber = self.smsc + # Some modems delete the SMSC number when setting text-mode SMS parameters; preserve it if needed + if currentSmscNumber != None: + self._smscNumber = None # clear cache + if self.requestDelivery: + self.write('AT+CSMP=49,167,0,0', parseError=False) # Enable delivery reports + else: + self.write('AT+CSMP=17,167,0,0', parseError=False) # Not enable delivery reports + # ...check SMSC again to ensure it did not change + if currentSmscNumber != None and self.smsc != currentSmscNumber: + self.smsc = currentSmscNumber + + # Set message storage, but first check what the modem supports - example response: +CPMS: (("SM","BM","SR"),("SM")) + try: + cpmsLine = lineStartingWith('+CPMS', self.write('AT+CPMS=?')) + except CommandError: + # Modem does not support AT+CPMS; SMS reading unavailable + self._smsReadSupported = False + self.log.warning('SMS preferred message storage query not supported by modem. SMS reading unavailable.') + else: + cpmsSupport = cpmsLine.split(' ', 1)[1].split('),(') + # Do a sanity check on the memory types returned - Nokia S60 devices return empty strings, for example + for memItem in cpmsSupport: + if len(memItem) == 0: + # No support for reading stored SMS via AT commands - probably a Nokia S60 + self._smsReadSupported = False + self.log.warning('Invalid SMS message storage support returned by modem. SMS reading unavailable. Response was: "%s"', cpmsLine) + break + else: + # Suppported memory types look fine, continue + preferredMemoryTypes = ('"ME"', '"SM"', '"SR"') + cpmsItems = [''] * len(cpmsSupport) + for i in xrange(len(cpmsSupport)): + for memType in preferredMemoryTypes: + if memType in cpmsSupport[i]: + if i == 0: + self._smsMemReadDelete = memType + cpmsItems[i] = memType + break + self.write('AT+CPMS={0}'.format(','.join(cpmsItems))) # Set message storage + del cpmsSupport + del cpmsLine + + if self._smsReadSupported and (self.smsReceivedCallback or self.smsStatusReportCallback): + try: + self.write('AT+CNMI=' + self.AT_CNMI) # Set message notifications + except CommandError: + try: + self.write('AT+CNMI=2,1,0,1,0') # Set message notifications, using TE for delivery reports + except CommandError: + # Message notifications not supported + self._smsReadSupported = False + self.log.warning('Incoming SMS notifications not supported by modem. SMS receiving unavailable.') + + # Incoming call notification setup + try: + self.write('AT+CLIP=1') # Enable calling line identification presentation + except CommandError as clipError: + self._callingLineIdentification = False + self.log.warning('Incoming call calling line identification (caller ID) not supported by modem. Error: {0}'.format(clipError)) + else: + self._callingLineIdentification = True + try: + self.write('AT+CRC=1') # Enable extended format of incoming indication (optional) + except CommandError as crcError: + self._extendedIncomingCallIndication = False + self.log.warning('Extended format incoming call indication not supported by modem. Error: {0}'.format(crcError)) + else: + self._extendedIncomingCallIndication = True + + # Call control setup + self.write('AT+CVHU=0', parseError=False) # Enable call hang-up with ATH command (ignore if command not supported) + + def _unlockSim(self, pin): + """ Unlocks the SIM card using the specified PIN (if necessary, else does nothing) """ + # Unlock the SIM card if needed + try: + cpinResponse = lineStartingWith('+CPIN', self.write('AT+CPIN?', timeout=15)) + except TimeoutException as timeout: + # Wavecom modems do not end +CPIN responses with "OK" (github issue #19) - see if just the +CPIN response was returned + if timeout.data != None: + cpinResponse = lineStartingWith('+CPIN', timeout.data) + if cpinResponse == None: + # No useful response read + raise timeout + else: + # Nothing read (real timeout) + raise timeout + if cpinResponse != '+CPIN: READY': + if pin != None: + self.write('AT+CPIN="{0}"'.format(pin)) + else: + raise PinRequiredError('AT+CPIN') + + def write(self, data, waitForResponse=True, timeout:float =10, parseError=True, writeTerm=TERMINATOR, expectedResponseTermSeq=None): + """ Write data to the modem. + + This method adds the ``\\r\\n`` end-of-line sequence to the data parameter, and + writes it to the modem. + + :param data: Command/data to be written to the modem + :type data: str + :param waitForResponse: Whether this method should block and return the response from the modem or not + :type waitForResponse: bool + :param timeout: Maximum amount of time in seconds to wait for a response from the modem + :type timeout: int + :param parseError: If True, a CommandError is raised if the modem responds with an error (otherwise the response is returned as-is) + :type parseError: bool + :param writeTerm: The terminating sequence to append to the written data + :type writeTerm: str + :param expectedResponseTermSeq: The expected terminating sequence that marks the end of the modem's response (defaults to ``\\r\\n``) + :type expectedResponseTermSeq: str + + :raise CommandError: if the command returns an error (only if parseError parameter is True) + :raise TimeoutException: if no response to the command was received from the modem + + :return: A list containing the response lines from the modem, or None if waitForResponse is False + :rtype: list + """ + + self.log.debug('write: %s', data) + responseLines = super(GsmModem, self).write(data + writeTerm, waitForResponse=waitForResponse, timeout=timeout, expectedResponseTermSeq=expectedResponseTermSeq) + if self._writeWait > 0: # Sleep a bit if required (some older modems suffer under load) + time.sleep(self._writeWait) + if waitForResponse: + cmdStatusLine = responseLines[-1] + if parseError: + if 'ERROR' in cmdStatusLine: + cmErrorMatch = self.CM_ERROR_REGEX.match(cmdStatusLine) + if cmErrorMatch: + errorType = cmErrorMatch.group(1) + errorCode = int(cmErrorMatch.group(2)) + if errorCode == 515 or errorCode == 14: + # 515 means: "Please wait, init or command processing in progress." + # 14 means "SIM busy" + self._writeWait += 0.2 # Increase waiting period temporarily + # Retry the command after waiting a bit + self.log.debug('Device/SIM busy error detected; self._writeWait adjusted to %fs', self._writeWait) + time.sleep(self._writeWait) + result = self.write(data, waitForResponse, timeout, parseError, writeTerm, expectedResponseTermSeq) + self.log.debug('self_writeWait set to 0.1 because of recovering from device busy (515) error') + if errorCode == 515: + self._writeWait = 0.1 # Set this to something sane for further commands (slow modem) + else: + self._writeWait = 0 # The modem was just waiting for the SIM card + return result + if errorType == 'CME': + raise CmeError(data, int(errorCode)) + else: # CMS error + raise CmsError(data, int(errorCode)) + else: + raise CommandError(data) + elif cmdStatusLine == 'COMMAND NOT SUPPORT': # Some Huawei modems respond with this for unknown commands + raise CommandError('{} ({})'.format(data,cmdStatusLine)) + return responseLines + + @property + def signalStrength(self): + """ Checks the modem's cellular network signal strength + + :raise CommandError: if an error occurs + + :return: The network signal strength as an integer between 0 and 99, or -1 if it is unknown + :rtype: int + """ + csq = self.CSQ_REGEX.match(self.write('AT+CSQ')[0]) + if csq: + ss = int(csq.group(1)) + return ss if ss != 99 else -1 + else: + raise CommandError() + + @property + def manufacturer(self): + """ :return: The modem's manufacturer's name """ + return self.write('AT+CGMI')[0] + + @property + def model(self): + """ :return: The modem's model name """ + return self.write('AT+CGMM')[0] + + @property + def revision(self): + """ :return: The modem's software revision, or None if not known/supported """ + try: + return self.write('AT+CGMR')[0] + except CommandError: + return None + + @property + def imei(self): + """ :return: The modem's serial number (IMEI number) """ + return self.write('AT+CGSN')[0] + + @property + def imsi(self): + """ :return: The IMSI (International Mobile Subscriber Identity) of the SIM card. The PIN may need to be entered before reading the IMSI """ + return self.write('AT+CIMI')[0] + + @property + def networkName(self): + """ :return: the name of the GSM Network Operator to which the modem is connected """ + copsMatch = lineMatching('^\+COPS: (\d),(\d),"(.+)",{0,1}\d*$', self.write('AT+COPS?')) # response format: +COPS: mode,format,"operator_name",x + if copsMatch: + return copsMatch.group(3) + + @property + def supportedCommands(self): + """ :return: list of AT commands supported by this modem (without the AT prefix). Returns None if not known """ + try: + # AT+CLAC responses differ between modems. Most respond with +CLAC: and then a comma-separated list of commands + # while others simply return each command on a new line, with no +CLAC: prefix + response = self.write('AT+CLAC', timeout=10) + if len(response) == 2: # Single-line response, comma separated + commands = response[0] + if commands.startswith('+CLAC'): + commands = commands[6:] # remove the +CLAC: prefix before splitting + return commands.split(',') + elif len(response) > 2: # Multi-line response + return [removeAtPrefix(cmd.strip()) for cmd in response[:-1]] + else: + self.log.debug('Unhandled +CLAC response: {0}'.format(response)) + return None + except (TimeoutException, CommandError): + # Try interactive command recognition + commands = [] + checkable_commands = ['^CVOICE', '+VTS', '^DTMF', '^USSDMODE', '+WIND', '+ZPAS', '+CSCS', '+CNUM'] + + # Check if modem is still alive + try: + response = self.write('AT') + except: + raise TimeoutException + + # Check all commands that will by considered + for command in checkable_commands: + try: + # Compose AT command that will read values under specified function + at_command='AT'+command+'=?' + response = self.write(at_command) + # If there are values inside response - add command to the list + commands.append(command) + except: + continue + + # Return found commands + if len(commands) == 0: + return None + else: + return commands + + @property + def smsTextMode(self): + """ :return: True if the modem is set to use text mode for SMS, False if it is set to use PDU mode """ + return self._smsTextMode + @smsTextMode.setter + def smsTextMode(self, textMode): + """ Set to True for the modem to use text mode for SMS, or False for it to use PDU mode """ + if textMode != self._smsTextMode: + if self.alive: + self.write('AT+CMGF={0}'.format(1 if textMode else 0)) + self._smsTextMode = textMode + self._compileSmsRegexes() + + @property + def smsSupportedEncoding(self): + """ + :raise NotImplementedError: If an error occures during AT command response parsing. + :return: List of supported encoding names. """ + + # Check if command is available + if self._commands == None: + self._commands = self.supportedCommands + + if self._commands == None: + self._smsSupportedEncodingNames = [] + return self._smsSupportedEncodingNames + + if not '+CSCS' in self._commands: + self._smsSupportedEncodingNames = [] + return self._smsSupportedEncodingNames + + # Get available encoding names + response = self.write('AT+CSCS=?') + + # Check response length (should be 2 - list of options and command status) + if len(response) != 2: + self.log.debug('Unhandled +CSCS response: {0}'.format(response)) + self._smsSupportedEncodingNames = [] + raise NotImplementedError + + # Extract encoding names list + try: + enc_list = response[0] # Get the first line + enc_list = enc_list[6:] # Remove '+CSCS: ' prefix + # Extract AT list in format ("str", "str2", "str3") + enc_list = enc_list.split('(')[1] + enc_list = enc_list.split(')')[0] + enc_list = enc_list.split(',') + enc_list = [x.split('"')[1] for x in enc_list] + except: + self.log.debug('Unhandled +CSCS response: {0}'.format(response)) + self._smsSupportedEncodingNames = [] + raise NotImplementedError + + self._smsSupportedEncodingNames = enc_list + return self._smsSupportedEncodingNames + + @property + def smsEncoding(self): + """ :return: Encoding name if encoding command is available, else GSM. """ + if self._commands == None: + self._commands = self.supportedCommands + + if self._commands == None: + return self._smsEncoding + + if '+CSCS' in self._commands: + response = self.write('AT+CSCS?') + + if len(response) == 2: + encoding = response[0] + if encoding.startswith('+CSCS'): + encoding = encoding[6:].split('"') # remove the +CSCS: prefix before splitting + if len(encoding) == 3: + self._smsEncoding = encoding[1] + else: + self.log.debug('Unhandled +CSCS response: {0}'.format(response)) + else: + self.log.debug('Unhandled +CSCS response: {0}'.format(response)) + + return self._smsEncoding + @smsEncoding.setter + def smsEncoding(self, encoding): + """ Set encoding for SMS inside PDU mode. + + :raise CommandError: if unable to set encoding + :raise ValueError: if encoding is not supported by modem + """ + # Check if command is available + if self._commands == None: + self._commands = self.supportedCommands + + if self._commands == None: + if encoding != self._smsEncoding: + raise CommandError('Unable to set SMS encoding (no supported commands)') + else: + return + + if not '+CSCS' in self._commands: + if encoding != self._smsEncoding: + raise CommandError('Unable to set SMS encoding (+CSCS command not supported)') + else: + return + + # Check if command is available + if self._smsSupportedEncodingNames == None: + self.smsSupportedEncoding + + # Check if desired encoding is available + if encoding in self._smsSupportedEncodingNames: + # Set encoding + response = self.write('AT+CSCS="{0}"'.format(encoding)) + if len(response) == 1: + if response[0].lower() == 'ok': + self._smsEncoding = encoding + return + + if encoding != self._smsEncoding: + raise ValueError('Unable to set SMS encoding (enocoding {0} not supported)'.format(encoding)) + else: + return + + def _setSmsMemory(self, readDelete=None, write=None): + """ Set the current SMS memory to use for read/delete/write operations """ + # Switch to the correct memory type if required + if write != None and write != self._smsMemWrite: + readDel = readDelete or self._smsMemReadDelete + self.write('AT+CPMS="{0}","{1}"'.format(readDel, write)) + self._smsMemReadDelete = readDel + self._smsMemWrite = write + elif readDelete != None and readDelete != self._smsMemReadDelete: + self.write('AT+CPMS="{0}"'.format(readDelete)) + self._smsMemReadDelete = readDelete + + def _compileSmsRegexes(self): + """ Compiles regular expression used for parsing SMS messages based on current mode """ + if self.smsTextMode: + if self.CMGR_SM_DELIVER_REGEX_TEXT == None: + self.CMGR_SM_DELIVER_REGEX_TEXT = re.compile('^\+CMGR: "([^"]+)","([^"]+)",[^,]*,"([^"]+)"$') + self.CMGR_SM_REPORT_REGEXT_TEXT = re.compile('^\+CMGR: ([^,]*),\d+,(\d+),"{0,1}([^"]*)"{0,1},\d*,"([^"]+)","([^"]+)",(\d+)$') + elif self.CMGR_REGEX_PDU == None: + self.CMGR_REGEX_PDU = re.compile('^\+CMGR:\s*(\d*),\s*"{0,1}([^"]*)"{0,1},\s*(\d+)$') + + @property + def gsmBusy(self): + """ :return: Current GSMBUSY state """ + try: + response = self.write('AT+GSMBUSY?') + response = response[0] # Get the first line + response = response[10] # Remove '+GSMBUSY: ' prefix + self._gsmBusy = response + except: + pass # If error is related to ME funtionality: +CME ERROR: + return self._gsmBusy + @gsmBusy.setter + def gsmBusy(self, gsmBusy): + """ Sete GSMBUSY state """ + if gsmBusy != self._gsmBusy: + if self.alive: + self.write('AT+GSMBUSY="{0}"'.format(gsmBusy)) + self._gsmBusy = gsmBusy + + @property + def smsc(self): + """ :return: The default SMSC number stored on the SIM card """ + if self._smscNumber == None: + try: + readSmsc = self.write('AT+CSCA?') + except SmscNumberUnknownError: + pass # Some modems return a CMS 330 error if the value isn't set + else: + cscaMatch = lineMatching('\+CSCA:\s*"([^,]+)",(\d+)$', readSmsc) + if cscaMatch: + self._smscNumber = cscaMatch.group(1) + return self._smscNumber + @smsc.setter + def smsc(self, smscNumber): + """ Set the default SMSC number to use when sending SMS messages """ + if smscNumber != self._smscNumber: + if self.alive: + self.write('AT+CSCA="{0}"'.format(smscNumber)) + self._smscNumber = smscNumber + + @property + def ownNumber(self): + """ Query subscriber phone number. + + It must be stored on SIM by operator. + If is it not stored already, it usually is possible to store the number by user. + + :raise TimeoutException: if a timeout was specified and reached + + + :return: Subscriber SIM phone number. Returns None if not known + :rtype: int + """ + + try: + if "+CNUM" in self._commands: + response = self.write('AT+CNUM') + else: + response = self.retrevie_first_number() + + if len(response)==1 and response[0] == "OK": # command is supported, but no number is set + response = self.retrevie_first_number() + return None + elif len(response) == 2: # OK and phone number. Actual number is in the first line, second parameter, and is placed inside quotation marks + cnumLine = response[0] + cnumMatch = self.CNUM_REGEX.match(cnumLine) + if cnumMatch: + return cnumMatch.group(1) + else: + self.log.debug('Error parse +CNUM response: {0}'.format(response)) + return None + elif len(response) > 2: # Multi-line response + self.log.debug('Unhandled +CNUM/+CPBS response: {0}'.format(response)) + return None + + except (TimeoutException, CommandError): + raise + + def retrevie_first_number(self): + # temporarily switch to "own numbers" phonebook, read position 1 and than switch back + response = self.write('AT+CPBS?') + selected_phonebook = response[0][6:].split('"')[ + 1] # first line, remove the +CSCS: prefix, split, first parameter + if selected_phonebook is not "ON": + self.write('AT+CPBS="ON"') + response = self.write("AT+CPBR=1") + self.write('AT+CPBS="{0}"'.format(selected_phonebook)) + return response + + @ownNumber.setter + def ownNumber(self, phone_number): + actual_phonebook = self.write('AT+CPBS?') + if actual_phonebook is not "ON": + self.write('AT+CPBS="ON"') + self.write('AT+CPBW=1,"' + phone_number + '"') + + + def waitForNetworkCoverage(self, timeout=None): + """ Block until the modem has GSM network coverage. + + This method blocks until the modem is registered with the network + and the signal strength is greater than 0, optionally timing out + if a timeout was specified + + :param timeout: Maximum time to wait for network coverage, in seconds + :type timeout: int or float + + :raise TimeoutException: if a timeout was specified and reached + :raise InvalidStateException: if the modem is not going to receive network coverage (SIM blocked, etc) + + :return: the current signal strength + """ + block = [True] + if timeout != None: + # Set up a timeout mechanism + def _cancelBlock(): + block[0] = False + t = threading.Timer(timeout, _cancelBlock) + t.start() + ss = -1 + checkCreg = True + while block[0]: + if checkCreg: + cregResult = lineMatching('^\+CREG:\s*(\d),(\d)(,[^,]*,[^,]*)?$', self.write('AT+CREG?', parseError=False)) # example result: +CREG: 0,1 + if cregResult: + status = int(cregResult.group(2)) + if status in (1, 5): + # 1: registered, home network, 5: registered, roaming + # Now simply check and return network signal strength + checkCreg = False + elif status == 3: + raise InvalidStateException('Network registration denied') + elif status == 0: + raise InvalidStateException('Device not searching for network operator') + else: + # Disable network registration check; only use signal strength + self.log.info('+CREG check disabled due to invalid response or unsupported command') + checkCreg = False + else: + # Check signal strength + ss = self.signalStrength + if ss > 0: + return ss + time.sleep(1) + else: + # If this is reached, the timer task has triggered + raise TimeoutException() + + def sendSms(self, destination, text, waitForDeliveryReport=False, deliveryTimeout=15, sendFlash=False): + """ Send an SMS text message + + :param destination: the recipient's phone number + :type destination: str + :param text: the message text + :type text: str + :param waitForDeliveryReport: if True, this method blocks until a delivery report is received for the sent message + :type waitForDeliveryReport: boolean + :param deliveryTimeout: the maximum time in seconds to wait for a delivery report (if "waitForDeliveryReport" is True) + :type deliveryTimeout: int or float + + :raise CommandError: if an error occurs while attempting to send the message + :raise TimeoutException: if the operation times out + """ + + # Check input text to select appropriate mode (text or PDU) + if self.smsTextMode: + try: + encodedText = encodeTextMode(text) + except ValueError: + self.smsTextMode = False + + if self.smsTextMode: + # Send SMS via AT commands + self.write('AT+CMGS="{0}"'.format(destination), timeout=5, expectedResponseTermSeq='> ') + result = lineStartingWith('+CMGS:', self.write(text, timeout=35, writeTerm=CTRLZ)) + else: + # Check encoding + try: + encodedText = encodeGsm7(text) + except ValueError: + encodedText = None + + # Set GSM modem SMS encoding format + # Encode message text and set data coding scheme based on text contents + if encodedText == None: + # Cannot encode text using GSM-7; use UCS2 instead + self.smsEncoding = 'UCS2' + else: + self.smsEncoding = 'GSM' + + # Encode text into PDUs + pdus = encodeSmsSubmitPdu(destination, text, reference=self._smsRef, sendFlash=sendFlash) + + # Send SMS PDUs via AT commands + for pdu in pdus: + self.write('AT+CMGS={0}'.format(pdu.tpduLength), timeout=5, expectedResponseTermSeq='> ') + result = lineStartingWith('+CMGS:', self.write(str(pdu), timeout=35, writeTerm=CTRLZ)) # example: +CMGS: xx + + if result == None: + raise CommandError('Modem did not respond with +CMGS response') + + # Keep SMS reference number in order to pair delivery reports with sent message + reference = int(result[7:]) + self._smsRef = reference + 1 + if self._smsRef > 255: + self._smsRef = 0 + + # Create sent SMS object for future delivery checks + sms = SentSms(destination, text, reference) + + # Add a weak-referenced entry for this SMS (allows us to update the SMS state if a status report is received) + self.sentSms[reference] = sms + if waitForDeliveryReport: + self._smsStatusReportEvent = threading.Event() + if self._smsStatusReportEvent.wait(deliveryTimeout): + self._smsStatusReportEvent = None + else: # Response timed out + self._smsStatusReportEvent = None + raise TimeoutException() + return sms + + def sendUssd(self, ussdString, responseTimeout=60): + """ Starts a USSD session by dialing the the specified USSD string, or \ + sends the specified string in the existing USSD session (if any) + + :param ussdString: The USSD access number to dial + :param responseTimeout: Maximum time to wait a response, in seconds + + :raise TimeoutException: if no response is received in time + + :return: The USSD response message/session (as a Ussd object) + :rtype: gsmmodem.modem.Ussd + """ + self._ussdSessionEvent = threading.Event() + try: + cusdResponse = self.write('AT+CUSD=1,"{0}",15'.format(ussdString), timeout=responseTimeout) # Should respond with "OK" + except Exception: + self._ussdSessionEvent = None # Cancel the thread sync lock + raise + + # Some modems issue the +CUSD response before the acknowledgment "OK" - check for that + if len(cusdResponse) > 1: + cusdResponseFound = lineStartingWith('+CUSD', cusdResponse) != None + if cusdResponseFound: + self._ussdSessionEvent = None # Cancel thread sync lock + return self._parseCusdResponse(cusdResponse) + # Wait for the +CUSD notification message + if self._ussdSessionEvent.wait(responseTimeout): + self._ussdSessionEvent = None + return self._ussdResponse + else: # Response timed out + self._ussdSessionEvent = None + raise TimeoutException() + + + def checkForwarding(self, querytype, responseTimeout=15): + """ Check forwarding status: 0=Unconditional, 1=Busy, 2=NoReply, 3=NotReach, 4=AllFwd, 5=AllCondFwd + :param querytype: The type of forwarding to check + + :return: Status + :rtype: Boolean + """ + try: + queryResponse = self.write('AT+CCFC={0},2'.format(querytype), timeout=responseTimeout) # Should respond with "OK" + except Exception: + raise + print(queryResponse) + return True + + + def setForwarding(self, fwdType, fwdEnable, fwdNumber, responseTimeout=15): + """ Check forwarding status: 0=Unconditional, 1=Busy, 2=NoReply, 3=NotReach, 4=AllFwd, 5=AllCondFwd + :param fwdType: The type of forwarding to set + :param fwdEnable: 1 to enable, 0 to disable, 2 to query, 3 to register, 4 to erase + :param fwdNumber: Number to forward to + + :return: Success or not + :rtype: Boolean + """ + try: + queryResponse = self.write('AT+CCFC={0},{1},"{2}"'.format(fwdType, fwdEnable, fwdNumber), timeout=responseTimeout) # Should respond with "OK" + except Exception: + raise + return False + print(queryResponse) + return queryResponse + + def dial(self, number, timeout=5, callStatusUpdateCallbackFunc=None): + """ Calls the specified phone number using a voice phone call + + :param number: The phone number to dial + :param timeout: Maximum time to wait for the call to be established + :param callStatusUpdateCallbackFunc: Callback function that is executed if the call's status changes due to + remote events (i.e. when it is answered, the call is ended by the remote party) + + :return: The outgoing call + :rtype: gsmmodem.modem.Call + """ + if self._waitForCallInitUpdate: + # Wait for the "call originated" notification message + self._dialEvent = threading.Event() + try: + self.write('ATD{0};'.format(number), timeout=timeout, waitForResponse=self._waitForAtdResponse) + except Exception: + self._dialEvent = None # Cancel the thread sync lock + raise + else: + # Don't wait for a call init update - base the call ID on the number of active calls + self.write('ATD{0};'.format(number), timeout=timeout, waitForResponse=self._waitForAtdResponse) + self.log.debug("Not waiting for outgoing call init update message") + callId = len(self.activeCalls) + 1 + callType = 0 # Assume voice + call = Call(self, callId, callType, number, callStatusUpdateCallbackFunc) + self.activeCalls[callId] = call + return call + + if self._mustPollCallStatus: + # Fake a call notification by polling call status until the status indicates that the call is being dialed + threading.Thread(target=self._pollCallStatus, kwargs={'expectedState': 0, 'timeout': timeout}).start() + + if self._dialEvent.wait(timeout): + self._dialEvent = None + callId, callType = self._dialResponse + call = Call(self, callId, callType, number, callStatusUpdateCallbackFunc) + self.activeCalls[callId] = call + return call + else: # Call establishing timed out + self._dialEvent = None + raise TimeoutException() + + def processStoredSms(self, unreadOnly=False): + """ Process all SMS messages currently stored on the device/SIM card. + + Reads all (or just unread) received SMS messages currently stored on the + device/SIM card, initiates "SMS received" events for them, and removes + them from the SIM card. + This is useful if SMS messages were received during a period that + python-gsmmodem was not running but the modem was powered on. + + :param unreadOnly: If True, only process unread SMS messages + :type unreadOnly: boolean + """ + if self.smsReceivedCallback: + states = [Sms.STATUS_RECEIVED_UNREAD] + if not unreadOnly: + states.insert(0, Sms.STATUS_RECEIVED_READ) + for msgStatus in states: + messages = self.listStoredSms(status=msgStatus, delete=True) + for sms in messages: + self.smsReceivedCallback(sms) + else: + raise ValueError('GsmModem.smsReceivedCallback not set') + + def listStoredSms(self, status=Sms.STATUS_ALL, memory=None, delete=False): + """ Returns SMS messages currently stored on the device/SIM card. + + The messages are read from the memory set by the "memory" parameter. + + :param status: Filter messages based on this read status; must be 0-4 (see Sms class) + :type status: int + :param memory: The memory type to read from. If None, use the current default SMS read memory + :type memory: str or None + :param delete: If True, delete returned messages from the device/SIM card + :type delete: bool + + :return: A list of Sms objects containing the messages read + :rtype: list + """ + self._setSmsMemory(readDelete=memory) + messages = [] + delMessages = set() + if self.smsTextMode: + cmglRegex= re.compile('^\+CMGL: (\d+),"([^"]+)","([^"]+)",[^,]*,"([^"]+)"$') + for key, val in dictItemsIter(Sms.TEXT_MODE_STATUS_MAP): + if status == val: + statusStr = key + break + else: + raise ValueError('Invalid status value: {0}'.format(status)) + result = self.write('AT+CMGL="{0}"'.format(statusStr)) + msgLines = [] + msgIndex = msgStatus = number = msgTime = None + for line in result: + cmglMatch = cmglRegex.match(line) + if cmglMatch: + # New message; save old one if applicable + if msgIndex != None and len(msgLines) > 0: + msgText = '\n'.join(msgLines) + msgLines = [] + messages.append(ReceivedSms(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], number, parseTextModeTimeStr(msgTime), msgText, None, [], msgIndex)) + delMessages.add(int(msgIndex)) + msgIndex, msgStatus, number, msgTime = cmglMatch.groups() + msgLines = [] + else: + if line != 'OK': + msgLines.append(line) + if msgIndex != None and len(msgLines) > 0: + msgText = '\n'.join(msgLines) + msgLines = [] + messages.append(ReceivedSms(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], number, parseTextModeTimeStr(msgTime), msgText, None, [], msgIndex)) + delMessages.add(int(msgIndex)) + else: + cmglRegex = re.compile('^\+CMGL:\s*(\d+),\s*(\d+),.*$') + readPdu = False + result = self.write('AT+CMGL={0}'.format(status)) + for line in result: + if not readPdu: + cmglMatch = cmglRegex.match(line) + if cmglMatch: + msgIndex = int(cmglMatch.group(1)) + msgStat = int(cmglMatch.group(2)) + readPdu = True + else: + try: + smsDict = decodeSmsPdu(line) + except EncodingError: + self.log.debug('Discarding line from +CMGL response: %s', line) + except: + pass + # dirty fix warning: https://github.com/yuriykashin/python-gsmmodem/issues/1 + # todo: make better fix + else: + if smsDict['type'] == 'SMS-DELIVER': + sms = ReceivedSms(self, int(msgStat), smsDict['number'], smsDict['time'], smsDict['text'], smsDict['smsc'], smsDict.get('udh', []), msgIndex) + elif smsDict['type'] == 'SMS-STATUS-REPORT': + sms = StatusReport(self, int(msgStat), smsDict['reference'], smsDict['number'], smsDict['time'], smsDict['discharge'], smsDict['status']) + else: + raise CommandError('Invalid PDU type for readStoredSms(): {0}'.format(smsDict['type'])) + messages.append(sms) + delMessages.add(msgIndex) + readPdu = False + if delete: + if status == Sms.STATUS_ALL: + # Delete all messages + self.deleteMultipleStoredSms() + else: + for msgIndex in delMessages: + self.deleteStoredSms(msgIndex) + return messages + + def _handleModemNotification(self, lines): + """ Handler for unsolicited notifications from the modem + + This method simply spawns a separate thread to handle the actual notification + (in order to release the read thread so that the handlers are able to write back to the modem, etc) + + :param lines The lines that were read + """ + threading.Thread(target=self.__threadedHandleModemNotification, kwargs={'lines': lines}).start() + + def __threadedHandleModemNotification(self, lines): + """ Implementation of _handleModemNotification() to be run in a separate thread + + :param lines The lines that were read + """ + next_line_is_te_statusreport = False + for line in lines: + if 'RING' in line: + # Incoming call (or existing call is ringing) + self._handleIncomingCall(lines) + return + elif line.startswith('+CMTI'): + # New SMS message indication + self._handleSmsReceived(line) + return + elif line.startswith('+CUSD'): + # USSD notification - either a response or a MT-USSD ("push USSD") message + self._handleUssd(lines) + return + elif line.startswith('+CDSI'): + # SMS status report + self._handleSmsStatusReport(line) + return + elif line.startswith('+CDS'): + # SMS status report at next line + next_line_is_te_statusreport = True + cdsMatch = self.CDS_REGEX.match(line) + if cdsMatch: + next_line_is_te_statusreport_length = int(cdsMatch.group(1)) + else: + next_line_is_te_statusreport_length = -1 + elif next_line_is_te_statusreport: + self._handleSmsStatusReportTe(next_line_is_te_statusreport_length, line) + return + elif line.startswith('+DTMF'): + # New incoming DTMF + self._handleIncomingDTMF(line) + return + else: + # Check for call status updates + for updateRegex, handlerFunc in self._callStatusUpdates: + match = updateRegex.match(line) + if match: + # Handle the update + handlerFunc(match) + return + # If this is reached, the notification wasn't handled + self.log.debug('Unhandled unsolicited modem notification: %s', lines) + + #Simcom modem able detect incoming DTMF + def _handleIncomingDTMF(self,line): + self.log.debug('Handling incoming DTMF') + + try: + dtmf_num=line.split(':')[1].replace(" ","") + self.dtmfpool.append(dtmf_num) + self.log.debug('DTMF number is {0}'.format(dtmf_num)) + except: + self.log.debug('Error parse DTMF number on line {0}'.format(line)) + def GetIncomingDTMF(self): + if (len(self.dtmfpool)==0): + return None + else: + return self.dtmfpool.pop(0) + + def _handleIncomingCall(self, lines): + self.log.debug('Handling incoming call') + ringLine = lines.pop(0) + if self._extendedIncomingCallIndication: + try: + callType = ringLine.split(' ', 1)[1] + except IndexError: + # Some external 3G scripts modify incoming call indication settings (issue #18) + self.log.debug('Extended incoming call indication format changed externally; re-enabling...') + callType = None + try: + # Re-enable extended format of incoming indication (optional) + self.write('AT+CRC=1') + except CommandError: + self.log.warning('Extended incoming call indication format changed externally; unable to re-enable') + self._extendedIncomingCallIndication = False + else: + callType = None + if self._callingLineIdentification and len(lines) > 0: + clipLine = lines.pop(0) + clipMatch = self.CLIP_REGEX.match(clipLine) + if clipMatch: + callerNumber = '+' + clipMatch.group(1) + ton = clipMatch.group(2) + #TODO: re-add support for this + callerName = None + #callerName = clipMatch.group(3) + #if callerName != None and len(callerName) == 0: + # callerName = None + else: + callerNumber = ton = callerName = None + else: + callerNumber = ton = callerName = None + + call = None + for activeCall in dictValuesIter(self.activeCalls): + if activeCall.number == callerNumber: + call = activeCall + call.ringCount += 1 + if call == None: + callId = len(self.activeCalls) + 1 + call = IncomingCall(self, callerNumber, ton, callerName, callId, callType) + self.activeCalls[callId] = call + self.incomingCallCallback(call) + + def _handleCallInitiated(self, regexMatch, callId=None, callType=1): + """ Handler for "outgoing call initiated" event notification line """ + if self._dialEvent: + if regexMatch: + groups = regexMatch.groups() + # Set self._dialReponse to (callId, callType) + if len(groups) >= 2: + self._dialResponse = (int(groups[0]) , int(groups[1])) + else: + self._dialResponse = (int(groups[0]), 1) # assume call type: VOICE + else: + self._dialResponse = callId, callType + self._dialEvent.set() + + def _handleCallAnswered(self, regexMatch, callId=None): + """ Handler for "outgoing call answered" event notification line """ + if regexMatch: + groups = regexMatch.groups() + if len(groups) > 1: + callId = int(groups[0]) + self.activeCalls[callId].answered = True + else: + # Call ID not available for this notificition - check for the first outgoing call that has not been answered + for call in dictValuesIter(self.activeCalls): + if call.answered == False and type(call) == Call: + call.answered = True + return + else: + # Use supplied values + self.activeCalls[callId].answered = True + + def _handleCallEnded(self, regexMatch, callId=None, filterUnanswered=False): + if regexMatch: + groups = regexMatch.groups() + if len(groups) > 0: + callId = int(groups[0]) + else: + # Call ID not available for this notification - check for the first outgoing call that is active + for call in dictValuesIter(self.activeCalls): + if type(call) == Call: + if not filterUnanswered or (filterUnanswered == True and call.answered == False): + callId = call.id + break + if callId and callId in self.activeCalls: + self.activeCalls[callId].answered = False + self.activeCalls[callId].active = False + del self.activeCalls[callId] + + def _handleCallRejected(self, regexMatch, callId=None): + """ Handler for rejected (unanswered calls being ended) + + Most modems use _handleCallEnded for handling both call rejections and remote hangups. + This method does the same, but filters for unanswered calls only. + """ + return self._handleCallEnded(regexMatch, callId, True) + + def _handleSmsReceived(self, notificationLine): + """ Handler for "new SMS" unsolicited notification line """ + self.log.info('SMS message received:'+str(notificationLine)) + if self.smsReceivedCallback is not None: + cmtiMatch = self.CMTI_REGEX.match(notificationLine) + if cmtiMatch: + msgMemory = cmtiMatch.group(1) + msgIndex = cmtiMatch.group(2) + sms = self.readStoredSms(msgIndex, msgMemory) + try: + self.smsReceivedCallback(sms) + except Exception: + self.log.error('error in smsReceivedCallback', exc_info=True) + else: + self.deleteStoredSms(msgIndex) + + def _handleSmsStatusReport(self, notificationLine): + """ Handler for SMS status reports """ + self.log.debug('SMS status report received') + cdsiMatch = self.CDSI_REGEX.match(notificationLine) + if cdsiMatch: + msgMemory = cdsiMatch.group(1) + msgIndex = cdsiMatch.group(2) + report = self.readStoredSms(msgIndex, msgMemory) + self.deleteStoredSms(msgIndex) + # Update sent SMS status if possible + if report.reference in self.sentSms: + self.sentSms[report.reference].report = report + if self._smsStatusReportEvent: + # A sendSms() call is waiting for this response - notify waiting thread + self._smsStatusReportEvent.set() + elif self.smsStatusReportCallback: + # Nothing is waiting for this report directly - use callback + try: + self.smsStatusReportCallback(report) + except Exception: + self.log.error('error in smsStatusReportCallback', exc_info=True) + + def _handleSmsStatusReportTe(self, length, notificationLine): + """ Handler for TE SMS status reports """ + self.log.info('TE SMS status report received') + try: + smsDict = decodeSmsPdu(notificationLine) + except EncodingError: + self.log.debug('Discarding notification line from +CDS response: %s', notificationLine) + else: + if smsDict['type'] == 'SMS-STATUS-REPORT': + report = StatusReport(self, int(smsDict['status']), smsDict['reference'], smsDict['number'], smsDict['time'], smsDict['discharge'], smsDict['status']) + else: + raise CommandError('Invalid PDU type for readStoredSms(): {0}'.format(smsDict['type'])) + # Update sent SMS status if possible + if report.reference in self.sentSms: + self.sentSms[report.reference].report = report + if self._smsStatusReportEvent: + # A sendSms() call is waiting for this response - notify waiting thread + self._smsStatusReportEvent.set() + else: + # Nothing is waiting for this report directly - use callback + try: + self.smsStatusReportCallback(report) + except Exception: + self.log.error('error in smsStatusReportCallback', exc_info=True) + + def readStoredSms(self, index, memory=None): + """ Reads and returns the SMS message at the specified index + + :param index: The index of the SMS message in the specified memory + :type index: int + :param memory: The memory type to read from. If None, use the current default SMS read memory + :type memory: str or None + + :raise CommandError: if unable to read the stored message + + :return: The SMS message + :rtype: subclass of gsmmodem.modem.Sms (either ReceivedSms or StatusReport) + """ + # Switch to the correct memory type if required + self._setSmsMemory(readDelete=memory) + msgData = self.write('AT+CMGR={0}'.format(index)) + # Parse meta information + if self.smsTextMode: + cmgrMatch = self.CMGR_SM_DELIVER_REGEX_TEXT.match(msgData[0]) + if cmgrMatch: + msgStatus, number, msgTime = cmgrMatch.groups() + msgText = '\n'.join(msgData[1:-1]) + return ReceivedSms(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], number, parseTextModeTimeStr(msgTime), msgText) + else: + # Try parsing status report + cmgrMatch = self.CMGR_SM_REPORT_REGEXT_TEXT.match(msgData[0]) + if cmgrMatch: + msgStatus, reference, number, sentTime, deliverTime, deliverStatus = cmgrMatch.groups() + if msgStatus.startswith('"'): + msgStatus = msgStatus[1:-1] + if len(msgStatus) == 0: + msgStatus = "REC UNREAD" + return StatusReport(self, Sms.TEXT_MODE_STATUS_MAP[msgStatus], int(reference), number, parseTextModeTimeStr(sentTime), parseTextModeTimeStr(deliverTime), int(deliverStatus)) + else: + raise CommandError('Failed to parse text-mode SMS message +CMGR response: {0}'.format(msgData)) + else: + cmgrMatch = self.CMGR_REGEX_PDU.match(msgData[0]) + if not cmgrMatch: + raise CommandError('Failed to parse PDU-mode SMS message +CMGR response: {0}'.format(msgData)) + stat, alpha, length = cmgrMatch.groups() + try: + stat = int(stat) + except Exception: + # Some modems (ZTE) do not always read return status - default to RECEIVED UNREAD + stat = Sms.STATUS_RECEIVED_UNREAD + pdu = msgData[1] + smsDict = decodeSmsPdu(pdu) + if smsDict['type'] == 'SMS-DELIVER': + return ReceivedSms(self, int(stat), smsDict['number'], smsDict['time'], smsDict['text'], smsDict['smsc'], smsDict.get('udh', [])) + elif smsDict['type'] == 'SMS-STATUS-REPORT': + return StatusReport(self, int(stat), smsDict['reference'], smsDict['number'], smsDict['time'], smsDict['discharge'], smsDict['status']) + else: + raise CommandError('Invalid PDU type for readStoredSms(): {0}'.format(smsDict['type'])) + + def deleteStoredSms(self, index, memory=None): + """ Deletes the SMS message stored at the specified index in modem/SIM card memory + + :param index: The index of the SMS message in the specified memory + :type index: int + :param memory: The memory type to delete from. If None, use the current default SMS read/delete memory + :type memory: str or None + + :raise CommandError: if unable to delete the stored message + """ + self._setSmsMemory(readDelete=memory) + self.write('AT+CMGD={0},0'.format(index)) + # TODO: make a check how many params are supported by the modem and use the right command. For example, Siemens MC35, TC35 take only one parameter. + #self.write('AT+CMGD={0}'.format(index)) + + def deleteMultipleStoredSms(self, delFlag=4, memory=None): + """ Deletes all SMS messages that have the specified read status. + + The messages are read from the memory set by the "memory" parameter. + The value of the "delFlag" paramater is the same as the "DelFlag" parameter of the +CMGD command: + 1: Delete All READ messages + 2: Delete All READ and SENT messages + 3: Delete All READ, SENT and UNSENT messages + 4: Delete All messages (this is the default) + + :param delFlag: Controls what type of messages to delete; see description above. + :type delFlag: int + :param memory: The memory type to delete from. If None, use the current default SMS read/delete memory + :type memory: str or None + :param delete: If True, delete returned messages from the device/SIM card + :type delete: bool + + :raise ValueErrror: if "delFlag" is not in range [1,4] + :raise CommandError: if unable to delete the stored messages + """ + if 0 < delFlag <= 4: + self._setSmsMemory(readDelete=memory) + self.write('AT+CMGD=1,{0}'.format(delFlag)) + else: + raise ValueError('"delFlag" must be in range [1,4]') + + def _handleUssd(self, lines): + """ Handler for USSD event notification line(s) """ + if self._ussdSessionEvent: + # A sendUssd() call is waiting for this response - parse it + self._ussdResponse = self._parseCusdResponse(lines) + # Notify waiting thread + self._ussdSessionEvent.set() + + def _parseCusdResponse(self, lines): + """ Parses one or more +CUSD notification lines (for USSD) + :return: USSD response object + :rtype: gsmmodem.modem.Ussd + """ + if len(lines) > 1: + # Issue #20: Some modem/network combinations use \r\n as in-message EOL indicators; + # - join lines to compensate for that (thanks to davidjb for the fix) + # Also, look for more than one +CUSD response because of certain modems' strange behaviour + cusdMatches = list(self.CUSD_REGEX.finditer('\r\n'.join(lines))) + else: + # Single standard +CUSD response + cusdMatches = [self.CUSD_REGEX.match(lines[0])] + message = None + sessionActive = True + if len(cusdMatches) > 1: + self.log.debug('Multiple +CUSD responses received; filtering...') + # Some modems issue a non-standard "extra" +CUSD notification for releasing the session + for cusdMatch in cusdMatches: + if cusdMatch.group(1) == '2': + # Set the session to inactive, but ignore the message + self.log.debug('Ignoring "session release" message: %s', cusdMatch.group(2)) + sessionActive = False + else: + # Not a "session release" message + message = cusdMatch.group(2) + if sessionActive and cusdMatch.group(1) != '1': + sessionActive = False + else: + sessionActive = cusdMatches[0].group(1) == '1' + message = cusdMatches[0].group(2) + return Ussd(self, sessionActive, message) + + def _placeHolderCallback(self, *args): + """ Does nothing """ + self.log.debug('called with args: {0}'.format(args)) + + def _pollCallStatus(self, expectedState, callId=None, timeout=None): + """ Poll the status of outgoing calls. + This is used for modems that do not have a known set of call status update notifications. + + :param expectedState: The internal state we are waiting for. 0 == initiated, 1 == answered, 2 = hangup + :type expectedState: int + + :raise TimeoutException: If a timeout was specified, and has occurred + """ + callDone = False + timeLeft = timeout or 999999 + while self.alive and not callDone and timeLeft > 0: + time.sleep(0.5) + if expectedState == 0: # Only call initializing can timeout + timeLeft -= 0.5 + try: + clcc = self._pollCallStatusRegex.match(self.write('AT+CLCC')[0]) + except TimeoutException as timeout: + # Can happend if the call was ended during our time.sleep() call + clcc = None + if clcc: + direction = int(clcc.group(2)) + if direction == 0: # Outgoing call + # Determine call state + stat = int(clcc.group(3)) + if expectedState == 0: # waiting for call initiated + if stat == 2 or stat == 3: # Dialing or ringing ("alerting") + callId = int(clcc.group(1)) + callType = int(clcc.group(4)) + self._handleCallInitiated(None, callId, callType) # if self_dialEvent is None, this does nothing + expectedState = 1 # Now wait for call answer + elif expectedState == 1: # waiting for call to be answered + if stat == 0: # Call active + callId = int(clcc.group(1)) + self._handleCallAnswered(None, callId) + expectedState = 2 # Now wait for call hangup + elif expectedState == 2 : # waiting for remote hangup + # Since there was no +CLCC response, the call is no longer active + callDone = True + self._handleCallEnded(None, callId=callId) + elif expectedState == 1: # waiting for call to be answered + # Call was rejected + callDone = True + self._handleCallRejected(None, callId=callId) + if timeLeft <= 0: + raise TimeoutException() + +class Ussd(object): + """ Unstructured Supplementary Service Data (USSD) message. + + This class contains convenient methods for replying to a USSD prompt + and to cancel the USSD session + """ + + def __init__(self, gsmModem, sessionActive, message): + self._gsmModem = weakref.proxy(gsmModem) + # Indicates if the session is active (True) or has been closed (False) + self.sessionActive = sessionActive + self.message = message + + def reply(self, message): + """ Sends a reply to this USSD message in the same USSD session + + :raise InvalidStateException: if the USSD session is not active (i.e. it has ended) + + :return: The USSD response message/session (as a Ussd object) + """ + if self.sessionActive: + return self._gsmModem.sendUssd(message) + else: + raise InvalidStateException('USSD session is inactive') + + def cancel(self): + """ Terminates/cancels the USSD session (without sending a reply) + + Does nothing if the USSD session is inactive. + """ + if self.sessionActive: + self._gsmModem.write('AT+CUSD=2') diff --git a/gsmmodem/pdu.py b/gsmmodem/pdu.py new file mode 100644 index 0000000..167ce2f --- /dev/null +++ b/gsmmodem/pdu.py @@ -0,0 +1,953 @@ +# -*- coding: utf8 -*- + +""" SMS PDU encoding methods """ + +from __future__ import unicode_literals + +import sys, codecs +from datetime import datetime, timedelta, tzinfo +from copy import copy +from .exceptions import EncodingError + +# For Python 3 support +PYTHON_VERSION = sys.version_info[0] +if PYTHON_VERSION >= 3: + MAX_INT = sys.maxsize + dictItemsIter = dict.items + xrange = range + unichr = chr + toByteArray = lambda x: bytearray(codecs.decode(x, 'hex_codec')) if type(x) == bytes else bytearray(codecs.decode(bytes(x, 'ascii'), 'hex_codec')) if type(x) == str else x + rawStrToByteArray = lambda x: bytearray(bytes(x, 'latin-1')) + +TEXT_MODE = ('\n\r !\"#%&\'()*+,-./0123456789:;<=>?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz') # TODO: Check if all of them are supported inside text mode +# Tables can be found at: http://en.wikipedia.org/wiki/GSM_03.38#GSM_7_bit_default_alphabet_and_extension_table_of_3GPP_TS_23.038_.2F_GSM_03.38 +GSM7_BASIC = ('@£$¥èéùìòÇ\nØø\rÅåΔ_ΦΓΛΩΠΨΣΘΞ\x1bÆæßÉ !\"#¤%&\'()*+,-./0123456789:;<=>?¡ABCDEFGHIJKLMNOPQRSTUVWXYZÄÖÑÜ`¿abcdefghijklmnopqrstuvwxyzäöñüà') +GSM7_EXTENDED = {chr(0xFF): 0x0A, + #CR2: chr(0x0D), + '^': chr(0x14), + #SS2: chr(0x1B), + '{': chr(0x28), + '}': chr(0x29), + '\\': chr(0x2F), + '[': chr(0x3C), + '~': chr(0x3D), + ']': chr(0x3E), + '|': chr(0x40), + '€': chr(0x65)} +# Maximum message sizes for each data coding +MAX_MESSAGE_LENGTH = {0x00: 160, # GSM-7 + 0x04: 140, # 8-bit + 0x08: 70} # UCS2 + +# Maximum message sizes for each data coding for multipart messages +MAX_MULTIPART_MESSAGE_LENGTH = {0x00: 153, # GSM-7 + 0x04: 133, # 8-bit TODO: Check this value! + 0x08: 67} # UCS2 + +class SmsPduTzInfo(tzinfo): + """ Simple implementation of datetime.tzinfo for handling timestamp GMT offsets specified in SMS PDUs """ + + def __init__(self, pduOffsetStr=None): + """ + :param pduOffset: 2 semi-octet timezone offset as specified by PDU (see GSM 03.40 spec) + :type pduOffset: str + + Note: pduOffsetStr is optional in this constructor due to the special requirement for pickling + mentioned in the Python docs. It should, however, be used (or otherwise pduOffsetStr must be + manually set) + """ + self._offset = None + if pduOffsetStr != None: + self._setPduOffsetStr(pduOffsetStr) + + def _setPduOffsetStr(self, pduOffsetStr): + # See if the timezone difference is positive/negative by checking MSB of first semi-octet + tzHexVal = int(pduOffsetStr, 16) + # In order to read time zone 'minute' shift: + # - Remove MSB (sign) + # - Read HEX value as decimal + # - Multiply by 15 + # See: https://en.wikipedia.org/wiki/GSM_03.40#Time_Format + + # Possible fix for #15 - convert invalid character to BCD-value + if (tzHexVal & 0x0F) > 0x9: + tzHexVal +=0x06 + + tzOffsetMinutes = int('{0:0>2X}'.format(tzHexVal & 0x7F)) * 15 + + if tzHexVal & 0x80 == 0: # positive + self._offset = timedelta(minutes=(tzOffsetMinutes)) + else: # negative + self._offset = timedelta(minutes=(-tzOffsetMinutes)) + + def utcoffset(self, dt): + return self._offset + + def dst(self, dt): + """ We do not have enough info in the SMS PDU to implement daylight savings time """ + return timedelta(0) + + +class InformationElement(object): + """ User Data Header (UDH) Information Element (IE) implementation + + This represents a single field ("information element") in the PDU's + User Data Header. The UDH itself contains one or more of these + information elements. + + If the IEI (IE identifier) is recognized, the class will automatically + specialize into one of the subclasses of InformationElement, + e.g. Concatenation or PortAddress, allowing the user to easily + access the specific (and useful) attributes of these special cases. + """ + + def __new__(cls, *args, **kwargs): #iei, ieLen, ieData): + """ Causes a new InformationElement class, or subclass + thereof, to be created. If the IEI is recognized, a specific + subclass of InformationElement is returned """ + if len(args) > 0: + targetClass = IEI_CLASS_MAP.get(args[0], cls) + elif 'iei' in kwargs: + targetClass = IEI_CLASS_MAP.get(kwargs['iei'], cls) + else: + return super(InformationElement, cls).__new__(cls) + return super(InformationElement, targetClass).__new__(targetClass) + + def __init__(self, iei, ieLen=0, ieData=None): + self.id = iei # IEI + self.dataLength = ieLen # IE Length + self.data = ieData or [] # raw IE data + + @classmethod + def decode(cls, byteIter): + """ Decodes a single IE at the current position in the specified + byte iterator + + :return: An InformationElement (or subclass) instance for the decoded IE + :rtype: InformationElement, or subclass thereof + """ + iei = next(byteIter) + ieLen = next(byteIter) + ieData = [] + for i in xrange(ieLen): + ieData.append(next(byteIter)) + return InformationElement(iei, ieLen, ieData) + + def encode(self): + """ Encodes this IE and returns the resulting bytes """ + result = bytearray() + result.append(self.id) + result.append(self.dataLength) + result.extend(self.data) + return result + + def __len__(self): + """ Exposes the IE's total length (including the IEI and IE length octet) in octets """ + return self.dataLength + 2 + + +class Concatenation(InformationElement): + """ IE that indicates SMS concatenation. + + This implementation handles both 8-bit and 16-bit concatenation + indication, and exposes the specific useful details of this + IE as instance variables. + + Exposes: + + reference + CSMS reference number, must be same for all the SMS parts in the CSMS + parts + total number of parts. The value shall remain constant for every short + message which makes up the concatenated short message. If the value is zero then + the receiving entity shall ignore the whole information element + number + this part's number in the sequence. The value shall start at 1 and + increment for every short message which makes up the concatenated short message + """ + + def __init__(self, iei=0x00, ieLen=0, ieData=None): + super(Concatenation, self).__init__(iei, ieLen, ieData) + if ieData != None: + if iei == 0x00: # 8-bit reference + self.reference, self.parts, self.number = ieData + else: # 0x08: 16-bit reference + self.reference = ieData[0] << 8 | ieData[1] + self.parts = ieData[2] + self.number = ieData[3] + + def encode(self): + if self.reference > 0xFF: + self.id = 0x08 # 16-bit reference + self.data = [self.reference >> 8, self.reference & 0xFF, self.parts, self.number] + else: + self.id = 0x00 # 8-bit reference + self.data = [self.reference, self.parts, self.number] + self.dataLength = len(self.data) + return super(Concatenation, self).encode() + + +class PortAddress(InformationElement): + """ IE that indicates an Application Port Addressing Scheme. + + This implementation handles both 8-bit and 16-bit concatenation + indication, and exposes the specific useful details of this + IE as instance variables. + + Exposes: + destination: The destination port number + source: The source port number + """ + + def __init__(self, iei=0x04, ieLen=0, ieData=None): + super(PortAddress, self).__init__(iei, ieLen, ieData) + if ieData != None: + if iei == 0x04: # 8-bit port addressing scheme + self.destination, self.source = ieData + else: # 0x05: 16-bit port addressing scheme + self.destination = ieData[0] << 8 | ieData[1] + self.source = ieData[2] << 8 | ieData[3] + + def encode(self): + if self.destination > 0xFF or self.source > 0xFF: + self.id = 0x05 # 16-bit + self.data = [self.destination >> 8, self.destination & 0xFF, self.source >> 8, self.source & 0xFF] + else: + self.id = 0x04 # 8-bit + self.data = [self.destination, self.source] + self.dataLength = len(self.data) + return super(PortAddress, self).encode() + + +# Map of recognized IEIs +IEI_CLASS_MAP = {0x00: Concatenation, # Concatenated short messages, 8-bit reference number + 0x08: Concatenation, # Concatenated short messages, 16-bit reference number + 0x04: PortAddress, # Application port addressing scheme, 8 bit address + 0x05: PortAddress # Application port addressing scheme, 16 bit address + } + + +class Pdu(object): + """ Encoded SMS PDU. Contains raw PDU data and related meta-information """ + + def __init__(self, data, tpduLength): + """ Constructor + :param data: the raw PDU data (as bytes) + :type data: bytearray + :param tpduLength: Length (in bytes) of the TPDU + :type tpduLength: int + """ + self.data = data + self.tpduLength = tpduLength + + def __str__(self): + global PYTHON_VERSION + if PYTHON_VERSION < 3: + return str(self.data).encode('hex').upper() + else: #pragma: no cover + return str(codecs.encode(self.data, 'hex_codec'), 'ascii').upper() + + +def encodeSmsSubmitPdu(number, text, reference=0, validity=None, smsc=None, requestStatusReport=True, rejectDuplicates=False, sendFlash=False): + """ Creates an SMS-SUBMIT PDU for sending a message with the specified text to the specified number + + :param number: the destination mobile number + :type number: str + :param text: the message text + :type text: str + :param reference: message reference number (see also: rejectDuplicates parameter) + :type reference: int + :param validity: message validity period (absolute or relative) + :type validity: datetime.timedelta (relative) or datetime.datetime (absolute) + :param smsc: SMSC number to use (leave None to use default) + :type smsc: str + :param rejectDuplicates: Flag that controls the TP-RD parameter (messages with same destination and reference may be rejected if True) + :type rejectDuplicates: bool + + :return: A list of one or more tuples containing the SMS PDU (as a bytearray, and the length of the TPDU part + :rtype: list of tuples + """ + if PYTHON_VERSION < 3: + if type(text) == str: + text = text.decode('UTF-8') + + tpduFirstOctet = 0x01 # SMS-SUBMIT PDU + if validity != None: + # Validity period format (TP-VPF) is stored in bits 4,3 of the first TPDU octet + if type(validity) == timedelta: + # Relative (TP-VP is integer) + tpduFirstOctet |= 0x10 # bit4 == 1, bit3 == 0 + validityPeriod = [_encodeRelativeValidityPeriod(validity)] + elif type(validity) == datetime: + # Absolute (TP-VP is semi-octet encoded date) + tpduFirstOctet |= 0x18 # bit4 == 1, bit3 == 1 + validityPeriod = _encodeTimestamp(validity) + else: + raise TypeError('"validity" must be of type datetime.timedelta (for relative value) or datetime.datetime (for absolute value)') + else: + validityPeriod = None + if rejectDuplicates: + tpduFirstOctet |= 0x04 # bit2 == 1 + if requestStatusReport: + tpduFirstOctet |= 0x20 # bit5 == 1 + + # Encode message text and set data coding scheme based on text contents + try: + encodedTextLength = len(encodeGsm7(text)) + except ValueError: + # Cannot encode text using GSM-7; use UCS2 instead + encodedTextLength = len(text) + alphabet = 0x08 # UCS2 + else: + alphabet = 0x00 # GSM-7 + + # Check if message should be concatenated + if encodedTextLength > MAX_MESSAGE_LENGTH[alphabet]: + # Text too long for single PDU - add "concatenation" User Data Header + concatHeaderPrototype = Concatenation() + concatHeaderPrototype.reference = reference + + # Devide whole text into parts + if alphabet == 0x00: + pduTextParts = divideTextGsm7(text) + elif alphabet == 0x08: + pduTextParts = divideTextUcs2(text) + else: + raise NotImplementedError + + pduCount = len(pduTextParts) + concatHeaderPrototype.parts = pduCount + tpduFirstOctet |= 0x40 + else: + concatHeaderPrototype = None + pduCount = 1 + + # Construct required PDU(s) + pdus = [] + for i in xrange(pduCount): + pdu = bytearray() + if smsc: + pdu.extend(_encodeAddressField(smsc, smscField=True)) + else: + pdu.append(0x00) # Don't supply an SMSC number - use the one configured in the device + + udh = bytearray() + if concatHeaderPrototype != None: + concatHeader = copy(concatHeaderPrototype) + concatHeader.number = i + 1 + pduText = pduTextParts[i] + pduTextLength = len(pduText) + udh.extend(concatHeader.encode()) + else: + pduText = text + + udhLen = len(udh) + + pdu.append(tpduFirstOctet) + pdu.append(reference) # message reference + # Add destination number + pdu.extend(_encodeAddressField(number)) + pdu.append(0x00) # Protocol identifier - no higher-level protocol + + pdu.append(alphabet if not sendFlash else (0x10 if alphabet == 0x00 else 0x18)) + if validityPeriod: + pdu.extend(validityPeriod) + + if alphabet == 0x00: # GSM-7 + encodedText = encodeGsm7(pduText) + userDataLength = len(encodedText) # Payload size in septets/characters + if udhLen > 0: + shift = ((udhLen + 1) * 8) % 7 # "fill bits" needed to make the UDH end on a septet boundary + userData = packSeptets(encodedText, padBits=shift) + if shift > 0: + userDataLength += 1 # take padding bits into account + else: + userData = packSeptets(encodedText) + elif alphabet == 0x08: # UCS2 + userData = encodeUcs2(pduText) + userDataLength = len(userData) + + if udhLen > 0: + userDataLength += udhLen + 1 # +1 for the UDH length indicator byte + pdu.append(userDataLength) + pdu.append(udhLen) + pdu.extend(udh) # UDH + else: + pdu.append(userDataLength) + pdu.extend(userData) # User Data (message payload) + tpdu_length = len(pdu) - 1 + pdus.append(Pdu(pdu, tpdu_length)) + return pdus + +def decodeSmsPdu(pdu): + """ Decodes SMS pdu data and returns a tuple in format (number, text) + + :param pdu: PDU data as a hex string, or a bytearray containing PDU octects + :type pdu: str or bytearray + + :raise EncodingError: If the specified PDU data cannot be decoded + + :return: The decoded SMS data as a dictionary + :rtype: dict + """ + try: + pdu = toByteArray(pdu) + except Exception as e: + # Python 2 raises TypeError, Python 3 raises binascii.Error + raise EncodingError(e) + result = {} + pduIter = iter(pdu) + + smscNumber, smscBytesRead = _decodeAddressField(pduIter, smscField=True) + result['smsc'] = smscNumber + result['tpdu_length'] = len(pdu) - smscBytesRead + + tpduFirstOctet = next(pduIter) + + pduType = tpduFirstOctet & 0x03 # bits 1-0 + if pduType == 0x00: # SMS-DELIVER or SMS-DELIVER REPORT + result['type'] = 'SMS-DELIVER' + result['number'] = _decodeAddressField(pduIter)[0] + result['protocol_id'] = next(pduIter) + dataCoding = _decodeDataCoding(next(pduIter)) + result['time'] = _decodeTimestamp(pduIter) + userDataLen = next(pduIter) + udhPresent = (tpduFirstOctet & 0x40) != 0 + ud = _decodeUserData(pduIter, userDataLen, dataCoding, udhPresent) + result.update(ud) + elif pduType == 0x01: # SMS-SUBMIT or SMS-SUBMIT-REPORT + result['type'] = 'SMS-SUBMIT' + result['reference'] = next(pduIter) # message reference - we don't really use this + result['number'] = _decodeAddressField(pduIter)[0] + result['protocol_id'] = next(pduIter) + dataCoding = _decodeDataCoding(next(pduIter)) + validityPeriodFormat = (tpduFirstOctet & 0x18) >> 3 # bits 4,3 + if validityPeriodFormat == 0x02: # TP-VP field present and integer represented (relative) + result['validity'] = _decodeRelativeValidityPeriod(next(pduIter)) + elif validityPeriodFormat == 0x03: # TP-VP field present and semi-octet represented (absolute) + result['validity'] = _decodeTimestamp(pduIter) + userDataLen = next(pduIter) + udhPresent = (tpduFirstOctet & 0x40) != 0 + ud = _decodeUserData(pduIter, userDataLen, dataCoding, udhPresent) + result.update(ud) + elif pduType == 0x02: # SMS-STATUS-REPORT or SMS-COMMAND + result['type'] = 'SMS-STATUS-REPORT' + result['reference'] = next(pduIter) + result['number'] = _decodeAddressField(pduIter)[0] + result['time'] = _decodeTimestamp(pduIter) + result['discharge'] = _decodeTimestamp(pduIter) + result['status'] = next(pduIter) + else: + raise EncodingError('Unknown SMS message type: {0}. First TPDU octet was: {1}'.format(pduType, tpduFirstOctet)) + + return result + +def _decodeUserData(byteIter, userDataLen, dataCoding, udhPresent): + """ Decodes PDU user data (UDHI (if present) and message text) """ + result = {} + if udhPresent: + # User Data Header is present + result['udh'] = [] + udhLen = next(byteIter) + ieLenRead = 0 + # Parse and store UDH fields + while ieLenRead < udhLen: + ie = InformationElement.decode(byteIter) + ieLenRead += len(ie) + result['udh'].append(ie) + del ieLenRead + if dataCoding == 0x00: # GSM-7 + # Since we are using 7-bit data, "fill bits" may have been added to make the UDH end on a septet boundary + shift = ((udhLen + 1) * 8) % 7 # "fill bits" needed to make the UDH end on a septet boundary + # Simulate another "shift" in the unpackSeptets algorithm in order to ignore the fill bits + prevOctet = next(byteIter) + shift += 1 + + if dataCoding == 0x00: # GSM-7 + if udhPresent: + userDataSeptets = unpackSeptets(byteIter, userDataLen, prevOctet, shift) + else: + userDataSeptets = unpackSeptets(byteIter, userDataLen) + result['text'] = decodeGsm7(userDataSeptets) + elif dataCoding == 0x02: # UCS2 + result['text'] = decodeUcs2(byteIter, userDataLen) + else: # 8-bit (data) + userData = [] + for b in byteIter: + userData.append(unichr(b)) + result['text'] = ''.join(userData) + return result + +def _decodeRelativeValidityPeriod(tpVp): + """ Calculates the relative SMS validity period (based on the table in section 9.2.3.12 of GSM 03.40) + :rtype: datetime.timedelta + """ + if tpVp <= 143: + return timedelta(minutes=((tpVp + 1) * 5)) + elif 144 <= tpVp <= 167: + return timedelta(hours=12, minutes=((tpVp - 143) * 30)) + elif 168 <= tpVp <= 196: + return timedelta(days=(tpVp - 166)) + elif 197 <= tpVp <= 255: + return timedelta(weeks=(tpVp - 192)) + else: + raise ValueError('tpVp must be in range [0, 255]') + +def _encodeRelativeValidityPeriod(validityPeriod): + """ Encodes the specified relative validity period timedelta into an integer for use in an SMS PDU + (based on the table in section 9.2.3.12 of GSM 03.40) + + :param validityPeriod: The validity period to encode + :type validityPeriod: datetime.timedelta + :rtype: int + """ + # Python 2.6 does not have timedelta.total_seconds(), so compute it manually + #seconds = validityPeriod.total_seconds() + seconds = validityPeriod.seconds + (validityPeriod.days * 24 * 3600) + if seconds <= 43200: # 12 hours + tpVp = int(seconds / 300) - 1 # divide by 5 minutes, subtract 1 + elif seconds <= 86400: # 24 hours + tpVp = int((seconds - 43200) / 1800) + 143 # subtract 12 hours, divide by 30 minutes. add 143 + elif validityPeriod.days <= 30: # 30 days + tpVp = validityPeriod.days + 166 # amount of days + 166 + elif validityPeriod.days <= 441: # max value of tpVp is 255 + tpVp = int(validityPeriod.days / 7) + 192 # amount of weeks + 192 + else: + raise ValueError('Validity period too long; tpVp limited to 1 octet (max value: 255)') + return tpVp + +def _decodeTimestamp(byteIter): + """ Decodes a 7-octet timestamp """ + dateStr = decodeSemiOctets(byteIter, 7) + timeZoneStr = dateStr[-2:] + return datetime.strptime(dateStr[:-2], '%y%m%d%H%M%S').replace(tzinfo=SmsPduTzInfo(timeZoneStr)) + +def _encodeTimestamp(timestamp): + """ Encodes a 7-octet timestamp from the specified date + + Note: the specified timestamp must have a UTC offset set; you can use gsmmodem.util.SimpleOffsetTzInfo for simple cases + + :param timestamp: The timestamp to encode + :type timestamp: datetime.datetime + + :return: The encoded timestamp + :rtype: bytearray + """ + if timestamp.tzinfo == None: + raise ValueError('Please specify time zone information for the timestamp (e.g. by using gsmmodem.util.SimpleOffsetTzInfo)') + + # See if the timezone difference is positive/negative + tzDelta = timestamp.utcoffset() + if tzDelta.days >= 0: + tzValStr = '{0:0>2}'.format(int(tzDelta.seconds / 60 / 15)) + else: # negative + tzVal = int((tzDelta.days * -3600 * 24 - tzDelta.seconds) / 60 / 15) # calculate offset in 0.25 hours + # Cast as literal hex value and set MSB of first semi-octet of timezone to 1 to indicate negative value + tzVal = int('{0:0>2}'.format(tzVal), 16) | 0x80 + tzValStr = '{0:0>2X}'.format(tzVal) + + dateStr = timestamp.strftime('%y%m%d%H%M%S') + tzValStr + + return encodeSemiOctets(dateStr) + +def _decodeDataCoding(octet): + if octet & 0xC0 == 0: + #compressed = octect & 0x20 + alphabet = (octet & 0x0C) >> 2 + return alphabet # 0x00 == GSM-7, 0x01 == 8-bit data, 0x02 == UCS2 + # We ignore other coding groups + return 0 + +def nibble2octet(addressLen): + return int((addressLen + 1) / 2) + +def _decodeAddressField(byteIter, smscField=False, log=False): + """ Decodes the address field at the current position of the bytearray iterator + + :param byteIter: Iterator over bytearray + :type byteIter: iter(bytearray) + + :return: Tuple containing the address value and amount of bytes read (value is or None if it is empty (zero-length)) + :rtype: tuple + """ + addressLen = next(byteIter) + if addressLen > 0: + toa = next(byteIter) + ton = (toa & 0x70) # bits 6,5,4 of type-of-address == type-of-number + if ton == 0x50: + # Alphanumberic number + addressLen = nibble2octet(addressLen) + septets = unpackSeptets(byteIter, addressLen) + addressValue = decodeGsm7(septets) + return (addressValue, (addressLen + 2)) + else: + # ton == 0x00: Unknown (might be international, local, etc) - leave as is + # ton == 0x20: National number + if smscField: + addressValue = decodeSemiOctets(byteIter, addressLen-1) + else: + addressLen = nibble2octet(addressLen) + addressValue = decodeSemiOctets(byteIter, addressLen) + addressLen += 1 # for the return value, add the toa byte + if ton == 0x10: # International number + addressValue = '+' + addressValue + return (addressValue, (addressLen + 1)) + else: + return (None, 1) + +def _encodeAddressField(address, smscField=False): + """ Encodes the address into an address field + + :param address: The address to encode (phone number or alphanumeric) + :type byteIter: str + + :return: Encoded SMS PDU address field + :rtype: bytearray + """ + # First, see if this is a number or an alphanumeric string + toa = 0x80 | 0x00 | 0x01 # Type-of-address start | Unknown type-of-number | ISDN/tel numbering plan + alphaNumeric = False + if address.isalnum(): + # Might just be a local number + if address.isdigit(): + # Local number + toa |= 0x20 + else: + # Alphanumeric address + toa |= 0x50 + toa &= 0xFE # switch to "unknown" numbering plan + alphaNumeric = True + else: + if address[0] == '+' and address[1:].isdigit(): + # International number + toa |= 0x10 + # Remove the '+' prefix + address = address[1:] + else: + # Alphanumeric address + toa |= 0x50 + toa &= 0xFE # switch to "unknown" numbering plan + alphaNumeric = True + if alphaNumeric: + addressValue = packSeptets(encodeGsm7(address, False)) + addressLen = len(addressValue) * 2 + else: + addressValue = encodeSemiOctets(address) + if smscField: + addressLen = len(addressValue) + 1 + else: + addressLen = len(address) + result = bytearray() + result.append(addressLen) + result.append(toa) + result.extend(addressValue) + return result + +def encodeSemiOctets(number): + """ Semi-octet encoding algorithm (e.g. for phone numbers) + + :return: bytearray containing the encoded octets + :rtype: bytearray + """ + if len(number) % 2 == 1: + number = number + 'F' # append the "end" indicator + octets = [int(number[i+1] + number[i], 16) for i in xrange(0, len(number), 2)] + return bytearray(octets) + +def decodeSemiOctets(encodedNumber, numberOfOctets=None): + """ Semi-octet decoding algorithm(e.g. for phone numbers) + + :param encodedNumber: The semi-octet-encoded telephone number (in bytearray format or hex string) + :type encodedNumber: bytearray, str or iter(bytearray) + :param numberOfOctets: The expected amount of octets after decoding (i.e. when to stop) + :type numberOfOctets: int + + :return: decoded telephone number + :rtype: string + """ + number = [] + if type(encodedNumber) in (str, bytes): + encodedNumber = bytearray(codecs.decode(encodedNumber, 'hex_codec')) + i = 0 + for octet in encodedNumber: + hexVal = hex(octet)[2:].zfill(2) + number.append(hexVal[1]) + if hexVal[0] != 'f': + number.append(hexVal[0]) + else: + break + if numberOfOctets != None: + i += 1 + if i == numberOfOctets: + break + return ''.join(number) + +def encodeTextMode(plaintext): + """ Text mode checker + + Tests whther SMS could be sent in text mode + + :param text: the text string to encode + + :raise ValueError: if the text string cannot be sent in text mode + + :return: Passed string + :rtype: str + """ + if PYTHON_VERSION >= 3: + plaintext = str(plaintext) + elif type(plaintext) == str: + plaintext = plaintext.decode('UTF-8') + + for char in plaintext: + idx = TEXT_MODE.find(char) + if idx != -1: + continue + else: + raise ValueError('Cannot encode char "{0}" inside text mode'.format(char)) + + if len(plaintext) > MAX_MESSAGE_LENGTH[0x00]: + raise ValueError('Message is too long for text mode (maximum {0} characters)'.format(MAX_MESSAGE_LENGTH[0x00])) + + return plaintext + +def encodeGsm7(plaintext, discardInvalid=False): + """ GSM-7 text encoding algorithm + + Encodes the specified text string into GSM-7 octets (characters). This method does not pack + the characters into septets. + + :param text: the text string to encode + :param discardInvalid: if True, characters that cannot be encoded will be silently discarded + + :raise ValueError: if the text string cannot be encoded using GSM-7 encoding (unless discardInvalid == True) + + :return: A bytearray containing the string encoded in GSM-7 encoding + :rtype: bytearray + """ + result = bytearray() + if PYTHON_VERSION >= 3: + plaintext = str(plaintext) + elif type(plaintext) == str: + plaintext = plaintext.decode('UTF-8') + + for char in plaintext: + idx = GSM7_BASIC.find(char) + if idx != -1: + result.append(idx) + elif char in GSM7_EXTENDED: + result.append(0x1B) # ESC - switch to extended table + result.append(ord(GSM7_EXTENDED[char])) + elif not discardInvalid: + raise ValueError('Cannot encode char "{0}" using GSM-7 encoding'.format(char)) + return result + +def decodeGsm7(encodedText): + """ GSM-7 text decoding algorithm + + Decodes the specified GSM-7-encoded string into a plaintext string. + + :param encodedText: the text string to encode + :type encodedText: bytearray or str + + :return: A string containing the decoded text + :rtype: str + """ + result = [] + if type(encodedText) == str: + encodedText = rawStrToByteArray(encodedText) #bytearray(encodedText) + iterEncoded = iter(encodedText) + for b in iterEncoded: + if b == 0x1B: # ESC - switch to extended table + c = chr(next(iterEncoded)) + for char, value in dictItemsIter(GSM7_EXTENDED): + if c == value: + result.append(char) + break + else: + result.append(GSM7_BASIC[b]) + return ''.join(result) + +def divideTextGsm7(plainText): + """ GSM7 message dividing algorithm + + Divides text into list of chunks that could be stored in a single, GSM7-encoded SMS message. + + :param plainText: the text string to divide + :type plainText: str + + :return: A list of strings + :rtype: list of str + """ + result = [] + + plainStartPtr = 0 + plainStopPtr = 0 + chunkByteSize = 0 + + if PYTHON_VERSION >= 3: + plainText = str(plainText) + while plainStopPtr < len(plainText): + char = plainText[plainStopPtr] + idx = GSM7_BASIC.find(char) + if idx != -1: + chunkByteSize = chunkByteSize + 1; + elif char in GSM7_EXTENDED: + chunkByteSize = chunkByteSize + 2; + else: + raise ValueError('Cannot encode char "{0}" using GSM-7 encoding'.format(char)) + + plainStopPtr = plainStopPtr + 1 + if chunkByteSize > MAX_MULTIPART_MESSAGE_LENGTH[0x00]: + plainStopPtr = plainStopPtr - 1 + + if chunkByteSize >= MAX_MULTIPART_MESSAGE_LENGTH[0x00]: + result.append(plainText[plainStartPtr:plainStopPtr]) + plainStartPtr = plainStopPtr + chunkByteSize = 0 + + if chunkByteSize > 0: + result.append(plainText[plainStartPtr:]) + + return result + +def packSeptets(octets, padBits=0): + """ Packs the specified octets into septets + + Typically the output of encodeGsm7 would be used as input to this function. The resulting + bytearray contains the original GSM-7 characters packed into septets ready for transmission. + + :rtype: bytearray + """ + result = bytearray() + if type(octets) == str: + octets = iter(rawStrToByteArray(octets)) + elif type(octets) == bytearray: + octets = iter(octets) + shift = padBits + if padBits == 0: + try: + prevSeptet = next(octets) + except StopIteration: + return result + else: + prevSeptet = 0x00 + for octet in octets: + septet = octet & 0x7f; + if shift == 7: + # prevSeptet has already been fully added to result + shift = 0 + prevSeptet = septet + continue + b = ((septet << (7 - shift)) & 0xFF) | (prevSeptet >> shift) + prevSeptet = septet + shift += 1 + result.append(b) + if shift != 7: + # There is a bit "left over" from prevSeptet + result.append(prevSeptet >> shift) + return result + +def unpackSeptets(septets, numberOfSeptets=None, prevOctet=None, shift=7): + """ Unpacks the specified septets into octets + + :param septets: Iterator or iterable containing the septets packed into octets + :type septets: iter(bytearray), bytearray or str + :param numberOfSeptets: The amount of septets to unpack (or None for all remaining in "septets") + :type numberOfSeptets: int or None + + :return: The septets unpacked into octets + :rtype: bytearray + """ + result = bytearray() + if type(septets) == str: + septets = iter(rawStrToByteArray(septets)) + elif type(septets) == bytearray: + septets = iter(septets) + if numberOfSeptets == None: + numberOfSeptets = MAX_INT # Loop until StopIteration + if numberOfSeptets == 0: + return result + i = 0 + for octet in septets: + i += 1 + if shift == 7: + shift = 1 + if prevOctet != None: + result.append(prevOctet >> 1) + if i <= numberOfSeptets: + result.append(octet & 0x7F) + prevOctet = octet + if i == numberOfSeptets: + break + else: + continue + b = ((octet << shift) & 0x7F) | (prevOctet >> (8 - shift)) + + prevOctet = octet + result.append(b) + shift += 1 + + if i == numberOfSeptets: + break + if shift == 7 and prevOctet: + b = prevOctet >> (8 - shift) + if b: + # The final septet value still needs to be unpacked + result.append(b) + return result + +def decodeUcs2(byteIter, numBytes): + """ Decodes UCS2-encoded text from the specified byte iterator, up to a maximum of numBytes """ + userData = [] + i = 0 + try: + while i < numBytes: + userData.append(unichr((next(byteIter) << 8) | next(byteIter))) + i += 2 + except StopIteration: + # Not enough bytes in iterator to reach numBytes; return what we have + pass + return ''.join(userData) + +def encodeUcs2(text): + """ UCS2 text encoding algorithm + + Encodes the specified text string into UCS2-encoded bytes. + + :param text: the text string to encode + + :return: A bytearray containing the string encoded in UCS2 encoding + :rtype: bytearray + """ + result = bytearray() + + for b in map(ord, text): + result.append(b >> 8) + result.append(b & 0xFF) + return result + +def divideTextUcs2(plainText): + """ UCS-2 message dividing algorithm + + Divides text into list of chunks that could be stored in a single, UCS-2 -encoded SMS message. + + :param plainText: the text string to divide + :type plainText: str + + :return: A list of strings + :rtype: list of str + """ + result = [] + resultLength = 0 + + fullChunksCount = int(len(plainText) / MAX_MULTIPART_MESSAGE_LENGTH[0x08]) + for i in range(fullChunksCount): + result.append(plainText[i * MAX_MULTIPART_MESSAGE_LENGTH[0x08] : (i + 1) * MAX_MULTIPART_MESSAGE_LENGTH[0x08]]) + resultLength = resultLength + MAX_MULTIPART_MESSAGE_LENGTH[0x08] + + # Add last, not fully filled chunk + if resultLength < len(plainText): + result.append(plainText[resultLength:]) + + return result diff --git a/gsmmodem/serial_comms.py b/gsmmodem/serial_comms.py new file mode 100644 index 0000000..44544fe --- /dev/null +++ b/gsmmodem/serial_comms.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python + +""" Low-level serial communications handling """ + +import sys, threading, logging + +import re +import serial # pyserial: http://pyserial.sourceforge.net + +from .exceptions import TimeoutException +from . import compat # For Python 2.6 compatibility + + +class SerialComms(object): + """ Wraps all low-level serial communications (actual read/write operations) """ + + log = logging.getLogger('gsmmodem.serial_comms.SerialComms') + + # End-of-line read terminator + RX_EOL_SEQ = b'\r\n' + # End-of-response terminator + RESPONSE_TERM = re.compile('^OK|ERROR|(\+CM[ES] ERROR: \d+)|(COMMAND NOT SUPPORT)$') + # Default timeout for serial port reads (in seconds) + timeout = 1 + + def __init__(self, port, baudrate=115200, notifyCallbackFunc=None, fatalErrorCallbackFunc=None, *args, **kwargs): + """ Constructor + + :param fatalErrorCallbackFunc: function to call if a fatal error occurs in the serial device reading thread + :type fatalErrorCallbackFunc: func + """ + self.alive = False + self.port = port + self.baudrate = baudrate + + self._responseEvent = None # threading.Event() + self._expectResponseTermSeq = None # expected response terminator sequence + self._response = None # Buffer containing response to a written command + self._notification = [] # Buffer containing lines from an unsolicited notification from the modem + # Reentrant lock for managing concurrent write access to the underlying serial port + self._txLock = threading.RLock() + + self.notifyCallback = notifyCallbackFunc or self._placeholderCallback + self.fatalErrorCallback = fatalErrorCallbackFunc or self._placeholderCallback + + self.com_args = args + self.com_kwargs = kwargs + + def connect(self): + """ Connects to the device and starts the read thread """ + self.serial = serial.Serial(dsrdtr=True, rtscts=True, port=self.port, baudrate=self.baudrate, + timeout=self.timeout, *self.com_args, **self.com_kwargs) + # Start read thread + self.alive = True + self.rxThread = threading.Thread(target=self._readLoop) + self.rxThread.daemon = True + self.rxThread.start() + + def close(self): + """ Stops the read thread, waits for it to exit cleanly, then closes the underlying serial port """ + self.alive = False + self.rxThread.join() + self.serial.close() + + def _handleLineRead(self, line, checkForResponseTerm=True): + # print 'sc.hlineread:',line + if self._responseEvent and not self._responseEvent.is_set(): + # A response event has been set up (another thread is waiting for this response) + self._response.append(line) + if not checkForResponseTerm or self.RESPONSE_TERM.match(line): + # End of response reached; notify waiting thread + # print 'response:', self._response + self.log.debug('response: %s', self._response) + self._responseEvent.set() + else: + # Nothing was waiting for this - treat it as a notification + self._notification.append(line) + if self.serial.inWaiting() == 0: + # No more chars on the way for this notification - notify higher-level callback + # print 'notification:', self._notification + self.log.debug('notification: %s', self._notification) + self.notifyCallback(self._notification) + self._notification = [] + + def _placeholderCallback(self, *args, **kwargs): + """ Placeholder callback function (does nothing) """ + + def _readLoop(self): + """ Read thread main loop + + Reads lines from the connected device + """ + try: + readTermSeq = bytearray(self.RX_EOL_SEQ) + readTermLen = len(readTermSeq) + rxBuffer = bytearray() + while self.alive: + data = self.serial.read(1) + if len(data) != 0: # check for timeout + # print >> sys.stderr, ' RX:', data,'({0})'.format(ord(data)) + rxBuffer.append(ord(data)) + if rxBuffer[-readTermLen:] == readTermSeq: + # A line (or other logical segment) has been read + line = rxBuffer[:-readTermLen].decode() + rxBuffer = bytearray() + if len(line) > 0: + # print 'calling handler' + self._handleLineRead(line) + elif self._expectResponseTermSeq: + if rxBuffer[-len(self._expectResponseTermSeq):] == self._expectResponseTermSeq: + line = rxBuffer.decode() + rxBuffer = bytearray() + self._handleLineRead(line, checkForResponseTerm=False) + # else: + # ' ' + except serial.SerialException as e: + self.alive = False + try: + self.serial.close() + except Exception: # pragma: no cover + pass + # Notify the fatal error handler + self.fatalErrorCallback(e) + + def write(self, data, waitForResponse=True, timeout=5, expectedResponseTermSeq=None): + data = data.encode() + with self._txLock: + if waitForResponse: + if expectedResponseTermSeq: + self._expectResponseTermSeq = bytearray(expectedResponseTermSeq.encode()) + self._response = [] + self._responseEvent = threading.Event() + self.serial.write(data) + if self._responseEvent.wait(timeout): + self._responseEvent = None + self._expectResponseTermSeq = False + return self._response + else: # Response timed out + self._responseEvent = None + self._expectResponseTermSeq = False + if len(self._response) > 0: + # Add the partial response to the timeout exception + raise TimeoutException(self._response) + else: + raise TimeoutException() + else: + self.serial.write(data) diff --git a/gsmmodem/util.py b/gsmmodem/util.py new file mode 100644 index 0000000..a61ba8d --- /dev/null +++ b/gsmmodem/util.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" Some common utility classes used by tests """ + +from datetime import datetime, timedelta, tzinfo +import re + + +class SimpleOffsetTzInfo(tzinfo): + """ Very simple implementation of datetime.tzinfo offering set timezone offset for datetime instances """ + + def __init__(self, offsetInHours=None): + """ Constructs a new tzinfo instance using an amount of hours as an offset + + :param offsetInHours: The timezone offset, in hours (may be negative) + :type offsetInHours: int or float + """ + if offsetInHours != None: # pragma: no cover + self.offsetInHours = offsetInHours + + def utcoffset(self, dt): + return timedelta(hours=self.offsetInHours) + + def dst(self, dt): + return timedelta(0) + + def __repr__(self): + return 'gsmmodem.util.SimpleOffsetTzInfo({0})'.format(self.offsetInHours) + + +def parseTextModeTimeStr(timeStr): + """ Parses the specified SMS text mode time string + + The time stamp format is "yy/MM/dd,hh:mm:ss±zz" + (yy = year, MM = month, dd = day, hh = hour, mm = minute, ss = second, zz = time zone + [Note: the unit of time zone is a quarter of an hour]) + + :param timeStr: The time string to parse + :type timeStr: str + + :return: datetime object representing the specified time string + :rtype: datetime.datetime + """ + msgTime = timeStr[:-3] + tzOffsetHours = int(int(timeStr[-3:]) * 0.25) + return datetime.strptime(msgTime, '%y/%m/%d,%H:%M:%S').replace(tzinfo=SimpleOffsetTzInfo(tzOffsetHours)) + + +def lineStartingWith(string, lines): + """ Searches through the specified list of strings and returns the + first line starting with the specified search string, or None if not found + """ + for line in lines: + if line.startswith(string): + return line + else: + return None + + +def lineMatching(regexStr, lines): + """ Searches through the specified list of strings and returns the regular expression + match for the first line that matches the specified regex string, or None if no match was found + + Note: if you have a pre-compiled regex pattern, use lineMatchingPattern() instead + + :type regexStr: Regular expression string to use + :type lines: List of lines to search + + :return: the regular expression match for the first line that matches the specified regex, or None if no match was found + :rtype: re.Match + """ + regex = re.compile(regexStr) + for line in lines: + m = regex.match(line) + if m: + return m + else: + return None + + +def lineMatchingPattern(pattern, lines): + """ Searches through the specified list of strings and returns the regular expression + match for the first line that matches the specified pre-compiled regex pattern, or None if no match was found + + Note: if you are using a regex pattern string (i.e. not already compiled), use lineMatching() instead + + :type pattern: Compiled regular expression pattern to use + :type lines: List of lines to search + + :return: the regular expression match for the first line that matches the specified regex, or None if no match was found + :rtype: re.Match + """ + for line in lines: + m = pattern.match(line) + if m: + return m + else: + return None + + +def allLinesMatchingPattern(pattern, lines): + """ Like lineMatchingPattern, but returns all lines that match the specified pattern + + :type pattern: Compiled regular expression pattern to use + :type lines: List of lines to search + + :return: list of re.Match objects for each line matched, or an empty list if none matched + :rtype: list + """ + result = [] + for line in lines: + m = pattern.match(line) + if m: + result.append(m) + return result + + +def removeAtPrefix(string): + """ Remove AT prefix from a specified string. + + :param string: An original string + :type string: str + + :return: A string with AT prefix removed + :rtype: str + """ + if string.startswith('AT'): + return string[2:] + return string