complete the check sim function

This commit is contained in:
2022-03-24 23:20:33 +01:00
parent 97f258a1b3
commit 94a602c791
19 changed files with 61 additions and 3312 deletions
-17
View File
@@ -1,17 +0,0 @@
""" Package that allows easy control of an attached GSM modem
The main class for controlling a modem is GsmModem, which can be imported
directly from this module.
Other important and useful classes are:
gsmmodem.modem.IncomingCall: wraps an incoming call and passed to the incoming call hanndler callback function
gsmmodem.modem.ReceivedSms: wraps a received SMS message and passed to the sms received hanndler callback function
gsmmodem.modem.SentSms: returned when sending SMS messages; used for tracking the status of the SMS message
All python-gsmmodem-specific exceptions are defined in the gsmmodem.modem.exceptions package.
@author: Francois Aucamp <francois.aucamp@gmail.com>
@license: LGPLv3+
"""
from .modem import GsmModem
-22
View File
@@ -1,22 +0,0 @@
""" Contains monkey-patched equivalents for a few commonly-used Python 2.7-and-higher functions.
Used to provide backwards-compatibility with Python 2.6
"""
import sys
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
import threading
# threading.Event.wait() always returns None in Python < 2.7 so we need to patch it
if hasattr(threading, '_Event'): # threading.Event is a function that return threading._Event
# This is heavily Python-implementation-specific, so patch where we can, otherwise leave it
def wrapWait(func):
def newWait(self, timeout=None):
func(self, timeout)
return self.is_set()
return newWait
threading._Event.wait = wrapWait(threading._Event.wait)
else:
raise ImportError('Could not patch this version of Python 2.{0} for compatibility with python-gsmmodem.'.format(sys.version_info[1]))
if sys.version_info[0] == 2:
str = str
else:
str = lambda x: x
-134
View File
@@ -1,134 +0,0 @@
""" Module defines exceptions used by gsmmodem """
class GsmModemException(Exception):
""" Base exception raised for error conditions when interacting with the GSM modem """
class TimeoutException(GsmModemException):
""" Raised when a write command times out """
def __init__(self, data=None):
""" @param data: Any data that was read was read before timeout occurred (if applicable) """
super(TimeoutException, self).__init__(data)
self.data = data
class InvalidStateException(GsmModemException):
""" Raised when an API method call is invoked on an object that is in an incorrect state """
class InterruptedException(InvalidStateException):
""" Raised when execution of an AT command is interrupt by a state change.
May contain another exception that was the cause of the interruption """
def __init__(self, message, cause=None):
""" @param cause: the exception that caused this interruption (usually a CmeError) """
super(InterruptedException, self).__init__(message)
self.cause = cause
class CommandError(GsmModemException):
""" Raised if the modem returns an error in response to an AT command
May optionally include an error type (CME or CMS) and -code (error-specific).
"""
_description = ''
def __init__(self, command=None, type=None, code=None):
self.command = command
self.type = type
self.code = code
if type != None and code != None:
super(CommandError, self).__init__('{0} {1}{2}'.format(type, code, ' ({0})'.format(self._description) if len(self._description) > 0 else ''))
elif command != None:
super(CommandError, self).__init__(command)
else:
super(CommandError, self).__init__()
class CmeError(CommandError):
""" ME error result code : +CME ERROR: <error>
Issued in response to an AT command
"""
def __new__(cls, *args, **kwargs):
# Return a specialized version of this class if possible
if len(args) >= 2:
code = args[1]
if code == 11:
return PinRequiredError(args[0])
elif code == 16:
return IncorrectPinError(args[0])
elif code == 12:
return PukRequiredError(args[0])
return super(CmeError, cls).__new__(cls, *args, **kwargs)
def __init__(self, command, code):
super(CmeError, self).__init__(command, 'CME', code)
class SecurityException(CmeError):
""" Security-related CME error """
def __init__(self, command, code):
super(SecurityException, self).__init__(command, code)
class PinRequiredError(SecurityException):
""" Raised if an operation failed because the SIM card's PIN has not been entered """
_description = 'SIM card PIN is required'
def __init__(self, command, code=11):
super(PinRequiredError, self).__init__(command, code)
class IncorrectPinError(SecurityException):
""" Raised if an incorrect PIN is entered """
_description = 'Incorrect PIN entered'
def __init__(self, command, code=16):
super(IncorrectPinError, self).__init__(command, code)
class PukRequiredError(SecurityException):
""" Raised an operation failed because the SIM card's PUK is required (SIM locked) """
_description = "PUK required (SIM locked)"
def __init__(self, command, code=12):
super(PukRequiredError, self).__init__(command, code)
class CmsError(CommandError):
""" Message service failure result code: +CMS ERROR : <er>
Issued in response to an AT command
"""
def __new__(cls, *args, **kwargs):
# Return a specialized version of this class if possible
if len(args) >= 2:
code = args[1]
if code == 330:
return SmscNumberUnknownError(args[0])
return super(CmsError, cls).__new__(cls, *args, **kwargs)
def __init__(self, command, code):
super(CmsError, self).__init__(command, 'CMS', code)
class SmscNumberUnknownError(CmsError):
""" Raised if the SMSC (service centre) address is missing when trying to send an SMS message """
_description = 'SMSC number not set'
def __init__(self, command, code=330):
super(SmscNumberUnknownError, self).__init__(command, code)
class EncodingError(GsmModemException):
""" Raised if a decoding- or encoding operation failed """
-96
View File
@@ -1,96 +0,0 @@
# -*- coding: utf8 -*-
""" GPRS/Data-specific classes
BRANCH: mms
PLEASE NOTE: *Everything* in this file (PdpContext, GprsModem class, etc) is experimental.
This is NOT meant to be used in production in any way; the API is completely unstable,
no unit tests will be written for this in the forseeable future, and stuff may generally
break and cause riots. Please do not file bug reports against this branch unless you
have a patch to go along with it, but even then: remember that this entire "mms" branch
is exploratory; I simply want to see what the possibilities are with it.
Use the "main" branch, and the GsmModem class if you want to build normal applications.
"""
import re
from .util import allLinesMatchingPattern
from .modem import GsmModem
class PdpContext(object):
""" Packet Data Protocol (PDP) context parameter values """
def __init__(self, cid, pdpType, apn, pdpAddress=None, dataCompression=0, headerCompression=0):
""" Construct a new Packet Data Protocol context
@param cid: PDP Context Identifier - specifies a particular PDP context definition
@type cid: int
@param pdpType: the type of packet data protocol (IP, PPP, IPV6, etc)
@type pdpType: str
@param apn: Access Point Name; logical name used to select the GGSN or external packet data network
@type apn: str
@param pdpAddress: identifies the MT in the address space applicable to the PDP. If None, a dynamic address may be requested.
@type pdpAddress: str
@param dataCompression: PDP data compression; 0 == off, 1 == on
@type dataCompression: int
@param headerCompression: PDP header compression; 0 == off, 1 == on
@type headerCompression: int
"""
self.cid = cid
self.pdpType = pdpType
self.apn = apn
self.pdpAddress = pdpAddress
self.dataCompression = dataCompression
self.headerCompression = headerCompression
class GprsModem(GsmModem):
""" EXPERIMENTAL: Specialized version of GsmModem that includes GPRS/data-specific commands """
@property
def pdpContexts(self):
""" Currently-defined Packet Data Protocol (PDP) context list
PDP paramter values returned include PDP type (IP, IPV6, PPP, X.25 etc), APN,
data compression, header compression, etc.
@return: a list of currently-defined PDP contexts
"""
result = []
cgdContResult = self.write('AT+CGDCONT?')
matches = allLinesMatchingPattern(re.compile(r'^\+CGDCONT:\s*(\d+),"([^"]+)","([^"]+)","([^"]+)",(\d+),(\d+)'), cgdContResult)
for cgdContMatch in matches:
cid, pdpType, apn, pdpAddress, dataCompression, headerCompression = cgdContMatch.groups()
pdpContext = PdpContext(cid, pdpType, apn, pdpAddress, dataCompression, headerCompression)
result.append(pdpContext)
return result
@property
def defaultPdpContext(self):
""" @return: the default PDP context, or None if not defined """
pdpContexts = self.pdpContexts
return pdpContexts[0] if len(pdpContexts) > 0 else None
@defaultPdpContext.setter
def defaultPdpContext(self, pdpContext):
""" Set the default PDP context (or clear it by setting it to None) """
self.write('AT+CGDCONT=,"{0}","{1}","{2}",{3},{4}'.format(pdpContext.pdpType, pdpContext.apn, pdpContext.pdpAddress or '', pdpContext.dataCompression, pdpContext.headerCompression))
def definePdpContext(self, pdpContext):
""" Define a new Packet Data Protocol context, or overwrite an existing one
@param pdpContext: The PDP context to define
@type pdpContext: gsmmodem.gprs.PdpContext
"""
self.write('AT+CGDCONT={0},"{1}","{2}","{3}",{4},{5}'.format(pdpContext.cid or '', pdpContext.pdpType, pdpContext.apn, pdpContext.pdpAddress or '', pdpContext.dataCompression, pdpContext.headerCompression))
def initDataConnection(self, pdpCid=1):
""" Initializes a packet data (GPRS) connection using the specified PDP Context ID """
# From this point on, we don't want the read thread interfering
#self.log.debug('Stopping read thread')
#self.alive = False
#self.rxThread.join()
self.log.debug('Init data connection')
self.write('ATD*99#', expectedResponseTermSeq="CONNECT\r")
self.log.debug('Data connection open; ready for PPP comms')
# From here on we use PPP to communicate with the network
-80
View File
@@ -1,80 +0,0 @@
import weakref
from gsmmodem.exceptions import CmeError, InterruptedException, InvalidStateException
class Call(object):
""" A voice call """
DTMF_COMMAND_BASE = '+VTS='
dtmfSupport = False # Indicates whether or not DTMF tones can be sent in calls
def __init__(self, gsmModem, callId, callType, number, callStatusUpdateCallbackFunc=None):
"""
:param gsmModem: GsmModem instance that created this object
:param number: The number that is being called
"""
self._gsmModem = weakref.proxy(gsmModem)
self._callStatusUpdateCallbackFunc = callStatusUpdateCallbackFunc
# Unique ID of this call
self.id = callId
# Call type (VOICE == 0, etc)
self.type = callType
# The remote number of this call (destination or origin)
self.number = number
# Flag indicating whether the call has been answered or not (backing field for "answered" property)
self._answered = False
# Flag indicating whether or not the call is active
# (meaning it may be ringing or answered, but not ended because of a hangup event)
self.active = True
@property
def answered(self):
return self._answered
@answered.setter
def answered(self, answered):
self._answered = answered
if self._callStatusUpdateCallbackFunc:
self._callStatusUpdateCallbackFunc(self)
def sendDtmfTone(self, tones):
""" Send one or more DTMF tones to the remote party (only allowed for an answered call)
Note: this is highly device-dependent, and might not work
:param digits: A str containining one or more DTMF tones to play, e.g. "3" or "\*123#"
:raise CommandError: if the command failed/is not supported
:raise InvalidStateException: if the call has not been answered, or is ended while the command is still executing
"""
if self.answered:
dtmfCommandBase = self.DTMF_COMMAND_BASE.format(cid=self.id)
toneLen = len(tones)
for tone in list(tones):
try:
self._gsmModem.write('AT{0}{1}'.format(dtmfCommandBase, tone), timeout=(5 + toneLen))
except CmeError as e:
if e.code == 30:
# No network service - can happen if call is ended during DTMF transmission (but also if DTMF is sent immediately after call is answered)
raise InterruptedException('No network service', e)
elif e.code == 3:
# Operation not allowed - can happen if call is ended during DTMF transmission
raise InterruptedException('Operation not allowed', e)
else:
raise e
else:
raise InvalidStateException('Call is not active (it has not yet been answered, or it has ended).')
def hangup(self):
""" End the phone call.
Does nothing if the call is already inactive.
"""
if self.active:
self._gsmModem.write('ATH')
self.answered = False
self.active = False
if self.id in self._gsmModem.activeCalls:
del self._gsmModem.activeCalls[self.id]
-40
View File
@@ -1,40 +0,0 @@
from gsmmodem.modem import Call
class IncomingCall(Call):
CALL_TYPE_MAP = {'VOICE': 0}
""" Represents an incoming call, conveniently allowing access to call meta information and -control """
def __init__(self, gsmModem, number, ton, callerName, callId, callType):
"""
:param gsmModem: GsmModem instance that created this object
:param number: Caller number
:param ton: TON (type of number/address) in integer format
:param callType: Type of the incoming call (VOICE, FAX, DATA, etc)
"""
if callType in self.CALL_TYPE_MAP:
callType = self.CALL_TYPE_MAP[callType]
super(IncomingCall, self).__init__(gsmModem, callId, callType, number)
# Type attribute of the incoming call
self.ton = ton
self.callerName = callerName
# Flag indicating whether the call is ringing or not
self.ringing = True
# Amount of times this call has rung (before answer/hangup)
self.ringCount = 1
def answer(self):
""" Answer the phone call.
:return: self (for chaining method calls)
"""
if self.ringing:
self._gsmModem.write('ATA')
self.ringing = False
self.answered = True
return self
def hangup(self):
""" End the phone call. """
self.ringing = False
super(IncomingCall, self).hangup()
-27
View File
@@ -1,27 +0,0 @@
import weakref
from gsmmodem.models.Sms import Sms
class ReceivedSms(Sms):
""" An SMS message that has been received (MT) """
def __init__(self, gsmModem, status, number, time, text, smsc=None, udh=[], index=None):
super(ReceivedSms, self).__init__(number, text, smsc)
self._gsmModem = weakref.proxy(gsmModem)
self.status = status
self.time = time
self.udh = udh
self.index = index
def reply(self, message):
""" Convenience method that sends a reply SMS to the sender of this message """
return self._gsmModem.sendSms(self.number, message)
def sendSms(self, dnumber, message):
""" Convenience method that sends a SMS to someone else """
return self._gsmModem.sendSms(dnumber, message)
def getModem(self):
""" Convenience method that returns the gsm modem instance """
return self._gsmModem
-27
View File
@@ -1,27 +0,0 @@
from gsmmodem.models.Sms import Sms
from gsmmodem.models.StatusReport import StatusReport
class SentSms(Sms):
""" An SMS message that has been sent (MO) """
ENROUTE = 0 # Status indicating message is still enroute to destination
DELIVERED = 1 # Status indicating message has been received by destination handset
FAILED = 2 # Status indicating message delivery has failed
def __init__(self, number, text, reference, smsc=None):
super(SentSms, self).__init__(number, text, smsc)
self.report = None # Status report for this SMS (StatusReport object)
self.reference = reference
@property
def status(self):
""" Status of this SMS. Can be ENROUTE, DELIVERED or FAILED
The actual status report object may be accessed via the 'report' attribute
if status is 'DELIVERED' or 'FAILED'
"""
if self.report == None:
return SentSms.ENROUTE
else:
return SentSms.DELIVERED if self.report.deliveryStatus == StatusReport.DELIVERED else SentSms.FAILED
-24
View File
@@ -1,24 +0,0 @@
import abc
class Sms(object):
""" Abstract SMS message base class """
__metaclass__ = abc.ABCMeta
# Some constants to ease handling SMS statuses
STATUS_RECEIVED_UNREAD = 0
STATUS_RECEIVED_READ = 1
STATUS_STORED_UNSENT = 2
STATUS_STORED_SENT = 3
STATUS_ALL = 4
# ...and a handy converter for text mode statuses
TEXT_MODE_STATUS_MAP = {'REC UNREAD': STATUS_RECEIVED_UNREAD,
'REC READ': STATUS_RECEIVED_READ,
'STO UNSENT': STATUS_STORED_UNSENT,
'STO SENT': STATUS_STORED_SENT,
'ALL': STATUS_ALL}
def __init__(self, number, text, smsc=None):
self.number = number
self.text = text
self.smsc = smsc
-24
View File
@@ -1,24 +0,0 @@
import weakref
from gsmmodem.models.Sms import Sms
class StatusReport(Sms):
""" An SMS status/delivery report
Note: the 'status' attribute of this class refers to this status report SM's status (whether
it has been read, etc). To find the status of the message that caused this status report,
use the 'deliveryStatus' attribute.
"""
DELIVERED = 0 # SMS delivery status: delivery successful
FAILED = 68 # SMS delivery status: delivery failed
def __init__(self, gsmModem, status, reference, number, timeSent, timeFinalized, deliveryStatus, smsc=None):
super(StatusReport, self).__init__(number, None, smsc)
self._gsmModem = weakref.proxy(gsmModem)
self.status = status
self.reference = reference
self.timeSent = timeSent
self.timeFinalized = timeFinalized
self.deliveryStatus = deliveryStatus
View File
-1548
View File
File diff suppressed because it is too large Load Diff
-953
View File
@@ -1,953 +0,0 @@
# -*- coding: utf8 -*-
""" SMS PDU encoding methods """
from __future__ import unicode_literals
import sys, codecs
from datetime import datetime, timedelta, tzinfo
from copy import copy
from .exceptions import EncodingError
# For Python 3 support
PYTHON_VERSION = sys.version_info[0]
if PYTHON_VERSION >= 3:
MAX_INT = sys.maxsize
dictItemsIter = dict.items
xrange = range
unichr = chr
toByteArray = lambda x: bytearray(codecs.decode(x, 'hex_codec')) if type(x) == bytes else bytearray(codecs.decode(bytes(x, 'ascii'), 'hex_codec')) if type(x) == str else x
rawStrToByteArray = lambda x: bytearray(bytes(x, 'latin-1'))
TEXT_MODE = ('\n\r !\"#%&\'()*+,-./0123456789:;<=>?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz') # TODO: Check if all of them are supported inside text mode
# Tables can be found at: http://en.wikipedia.org/wiki/GSM_03.38#GSM_7_bit_default_alphabet_and_extension_table_of_3GPP_TS_23.038_.2F_GSM_03.38
GSM7_BASIC = ('@£$¥èéùìòÇ\nØø\rÅåΔ_ΦΓΛΩΠΨΣΘΞ\x1bÆæßÉ !\"%&\'()*+,-./0123456789:;<=>?¡ABCDEFGHIJKLMNOPQRSTUVWXYZÄÖÑÜ`¿abcdefghijklmnopqrstuvwxyzäöñüà')
GSM7_EXTENDED = {chr(0xFF): 0x0A,
#CR2: chr(0x0D),
'^': chr(0x14),
#SS2: chr(0x1B),
'{': chr(0x28),
'}': chr(0x29),
'\\': chr(0x2F),
'[': chr(0x3C),
'~': chr(0x3D),
']': chr(0x3E),
'|': chr(0x40),
'': chr(0x65)}
# Maximum message sizes for each data coding
MAX_MESSAGE_LENGTH = {0x00: 160, # GSM-7
0x04: 140, # 8-bit
0x08: 70} # UCS2
# Maximum message sizes for each data coding for multipart messages
MAX_MULTIPART_MESSAGE_LENGTH = {0x00: 153, # GSM-7
0x04: 133, # 8-bit TODO: Check this value!
0x08: 67} # UCS2
class SmsPduTzInfo(tzinfo):
""" Simple implementation of datetime.tzinfo for handling timestamp GMT offsets specified in SMS PDUs """
def __init__(self, pduOffsetStr=None):
"""
:param pduOffset: 2 semi-octet timezone offset as specified by PDU (see GSM 03.40 spec)
:type pduOffset: str
Note: pduOffsetStr is optional in this constructor due to the special requirement for pickling
mentioned in the Python docs. It should, however, be used (or otherwise pduOffsetStr must be
manually set)
"""
self._offset = None
if pduOffsetStr != None:
self._setPduOffsetStr(pduOffsetStr)
def _setPduOffsetStr(self, pduOffsetStr):
# See if the timezone difference is positive/negative by checking MSB of first semi-octet
tzHexVal = int(pduOffsetStr, 16)
# In order to read time zone 'minute' shift:
# - Remove MSB (sign)
# - Read HEX value as decimal
# - Multiply by 15
# See: https://en.wikipedia.org/wiki/GSM_03.40#Time_Format
# Possible fix for #15 - convert invalid character to BCD-value
if (tzHexVal & 0x0F) > 0x9:
tzHexVal +=0x06
tzOffsetMinutes = int('{0:0>2X}'.format(tzHexVal & 0x7F)) * 15
if tzHexVal & 0x80 == 0: # positive
self._offset = timedelta(minutes=(tzOffsetMinutes))
else: # negative
self._offset = timedelta(minutes=(-tzOffsetMinutes))
def utcoffset(self, dt):
return self._offset
def dst(self, dt):
""" We do not have enough info in the SMS PDU to implement daylight savings time """
return timedelta(0)
class InformationElement(object):
""" User Data Header (UDH) Information Element (IE) implementation
This represents a single field ("information element") in the PDU's
User Data Header. The UDH itself contains one or more of these
information elements.
If the IEI (IE identifier) is recognized, the class will automatically
specialize into one of the subclasses of InformationElement,
e.g. Concatenation or PortAddress, allowing the user to easily
access the specific (and useful) attributes of these special cases.
"""
def __new__(cls, *args, **kwargs): #iei, ieLen, ieData):
""" Causes a new InformationElement class, or subclass
thereof, to be created. If the IEI is recognized, a specific
subclass of InformationElement is returned """
if len(args) > 0:
targetClass = IEI_CLASS_MAP.get(args[0], cls)
elif 'iei' in kwargs:
targetClass = IEI_CLASS_MAP.get(kwargs['iei'], cls)
else:
return super(InformationElement, cls).__new__(cls)
return super(InformationElement, targetClass).__new__(targetClass)
def __init__(self, iei, ieLen=0, ieData=None):
self.id = iei # IEI
self.dataLength = ieLen # IE Length
self.data = ieData or [] # raw IE data
@classmethod
def decode(cls, byteIter):
""" Decodes a single IE at the current position in the specified
byte iterator
:return: An InformationElement (or subclass) instance for the decoded IE
:rtype: InformationElement, or subclass thereof
"""
iei = next(byteIter)
ieLen = next(byteIter)
ieData = []
for i in xrange(ieLen):
ieData.append(next(byteIter))
return InformationElement(iei, ieLen, ieData)
def encode(self):
""" Encodes this IE and returns the resulting bytes """
result = bytearray()
result.append(self.id)
result.append(self.dataLength)
result.extend(self.data)
return result
def __len__(self):
""" Exposes the IE's total length (including the IEI and IE length octet) in octets """
return self.dataLength + 2
class Concatenation(InformationElement):
""" IE that indicates SMS concatenation.
This implementation handles both 8-bit and 16-bit concatenation
indication, and exposes the specific useful details of this
IE as instance variables.
Exposes:
reference
CSMS reference number, must be same for all the SMS parts in the CSMS
parts
total number of parts. The value shall remain constant for every short
message which makes up the concatenated short message. If the value is zero then
the receiving entity shall ignore the whole information element
number
this part's number in the sequence. The value shall start at 1 and
increment for every short message which makes up the concatenated short message
"""
def __init__(self, iei=0x00, ieLen=0, ieData=None):
super(Concatenation, self).__init__(iei, ieLen, ieData)
if ieData != None:
if iei == 0x00: # 8-bit reference
self.reference, self.parts, self.number = ieData
else: # 0x08: 16-bit reference
self.reference = ieData[0] << 8 | ieData[1]
self.parts = ieData[2]
self.number = ieData[3]
def encode(self):
if self.reference > 0xFF:
self.id = 0x08 # 16-bit reference
self.data = [self.reference >> 8, self.reference & 0xFF, self.parts, self.number]
else:
self.id = 0x00 # 8-bit reference
self.data = [self.reference, self.parts, self.number]
self.dataLength = len(self.data)
return super(Concatenation, self).encode()
class PortAddress(InformationElement):
""" IE that indicates an Application Port Addressing Scheme.
This implementation handles both 8-bit and 16-bit concatenation
indication, and exposes the specific useful details of this
IE as instance variables.
Exposes:
destination: The destination port number
source: The source port number
"""
def __init__(self, iei=0x04, ieLen=0, ieData=None):
super(PortAddress, self).__init__(iei, ieLen, ieData)
if ieData != None:
if iei == 0x04: # 8-bit port addressing scheme
self.destination, self.source = ieData
else: # 0x05: 16-bit port addressing scheme
self.destination = ieData[0] << 8 | ieData[1]
self.source = ieData[2] << 8 | ieData[3]
def encode(self):
if self.destination > 0xFF or self.source > 0xFF:
self.id = 0x05 # 16-bit
self.data = [self.destination >> 8, self.destination & 0xFF, self.source >> 8, self.source & 0xFF]
else:
self.id = 0x04 # 8-bit
self.data = [self.destination, self.source]
self.dataLength = len(self.data)
return super(PortAddress, self).encode()
# Map of recognized IEIs
IEI_CLASS_MAP = {0x00: Concatenation, # Concatenated short messages, 8-bit reference number
0x08: Concatenation, # Concatenated short messages, 16-bit reference number
0x04: PortAddress, # Application port addressing scheme, 8 bit address
0x05: PortAddress # Application port addressing scheme, 16 bit address
}
class Pdu(object):
""" Encoded SMS PDU. Contains raw PDU data and related meta-information """
def __init__(self, data, tpduLength):
""" Constructor
:param data: the raw PDU data (as bytes)
:type data: bytearray
:param tpduLength: Length (in bytes) of the TPDU
:type tpduLength: int
"""
self.data = data
self.tpduLength = tpduLength
def __str__(self):
global PYTHON_VERSION
if PYTHON_VERSION < 3:
return str(self.data).encode('hex').upper()
else: #pragma: no cover
return str(codecs.encode(self.data, 'hex_codec'), 'ascii').upper()
def encodeSmsSubmitPdu(number, text, reference=0, validity=None, smsc=None, requestStatusReport=True, rejectDuplicates=False, sendFlash=False):
""" Creates an SMS-SUBMIT PDU for sending a message with the specified text to the specified number
:param number: the destination mobile number
:type number: str
:param text: the message text
:type text: str
:param reference: message reference number (see also: rejectDuplicates parameter)
:type reference: int
:param validity: message validity period (absolute or relative)
:type validity: datetime.timedelta (relative) or datetime.datetime (absolute)
:param smsc: SMSC number to use (leave None to use default)
:type smsc: str
:param rejectDuplicates: Flag that controls the TP-RD parameter (messages with same destination and reference may be rejected if True)
:type rejectDuplicates: bool
:return: A list of one or more tuples containing the SMS PDU (as a bytearray, and the length of the TPDU part
:rtype: list of tuples
"""
if PYTHON_VERSION < 3:
if type(text) == str:
text = text.decode('UTF-8')
tpduFirstOctet = 0x01 # SMS-SUBMIT PDU
if validity != None:
# Validity period format (TP-VPF) is stored in bits 4,3 of the first TPDU octet
if type(validity) == timedelta:
# Relative (TP-VP is integer)
tpduFirstOctet |= 0x10 # bit4 == 1, bit3 == 0
validityPeriod = [_encodeRelativeValidityPeriod(validity)]
elif type(validity) == datetime:
# Absolute (TP-VP is semi-octet encoded date)
tpduFirstOctet |= 0x18 # bit4 == 1, bit3 == 1
validityPeriod = _encodeTimestamp(validity)
else:
raise TypeError('"validity" must be of type datetime.timedelta (for relative value) or datetime.datetime (for absolute value)')
else:
validityPeriod = None
if rejectDuplicates:
tpduFirstOctet |= 0x04 # bit2 == 1
if requestStatusReport:
tpduFirstOctet |= 0x20 # bit5 == 1
# Encode message text and set data coding scheme based on text contents
try:
encodedTextLength = len(encodeGsm7(text))
except ValueError:
# Cannot encode text using GSM-7; use UCS2 instead
encodedTextLength = len(text)
alphabet = 0x08 # UCS2
else:
alphabet = 0x00 # GSM-7
# Check if message should be concatenated
if encodedTextLength > MAX_MESSAGE_LENGTH[alphabet]:
# Text too long for single PDU - add "concatenation" User Data Header
concatHeaderPrototype = Concatenation()
concatHeaderPrototype.reference = reference
# Devide whole text into parts
if alphabet == 0x00:
pduTextParts = divideTextGsm7(text)
elif alphabet == 0x08:
pduTextParts = divideTextUcs2(text)
else:
raise NotImplementedError
pduCount = len(pduTextParts)
concatHeaderPrototype.parts = pduCount
tpduFirstOctet |= 0x40
else:
concatHeaderPrototype = None
pduCount = 1
# Construct required PDU(s)
pdus = []
for i in xrange(pduCount):
pdu = bytearray()
if smsc:
pdu.extend(_encodeAddressField(smsc, smscField=True))
else:
pdu.append(0x00) # Don't supply an SMSC number - use the one configured in the device
udh = bytearray()
if concatHeaderPrototype != None:
concatHeader = copy(concatHeaderPrototype)
concatHeader.number = i + 1
pduText = pduTextParts[i]
pduTextLength = len(pduText)
udh.extend(concatHeader.encode())
else:
pduText = text
udhLen = len(udh)
pdu.append(tpduFirstOctet)
pdu.append(reference) # message reference
# Add destination number
pdu.extend(_encodeAddressField(number))
pdu.append(0x00) # Protocol identifier - no higher-level protocol
pdu.append(alphabet if not sendFlash else (0x10 if alphabet == 0x00 else 0x18))
if validityPeriod:
pdu.extend(validityPeriod)
if alphabet == 0x00: # GSM-7
encodedText = encodeGsm7(pduText)
userDataLength = len(encodedText) # Payload size in septets/characters
if udhLen > 0:
shift = ((udhLen + 1) * 8) % 7 # "fill bits" needed to make the UDH end on a septet boundary
userData = packSeptets(encodedText, padBits=shift)
if shift > 0:
userDataLength += 1 # take padding bits into account
else:
userData = packSeptets(encodedText)
elif alphabet == 0x08: # UCS2
userData = encodeUcs2(pduText)
userDataLength = len(userData)
if udhLen > 0:
userDataLength += udhLen + 1 # +1 for the UDH length indicator byte
pdu.append(userDataLength)
pdu.append(udhLen)
pdu.extend(udh) # UDH
else:
pdu.append(userDataLength)
pdu.extend(userData) # User Data (message payload)
tpdu_length = len(pdu) - 1
pdus.append(Pdu(pdu, tpdu_length))
return pdus
def decodeSmsPdu(pdu):
""" Decodes SMS pdu data and returns a tuple in format (number, text)
:param pdu: PDU data as a hex string, or a bytearray containing PDU octects
:type pdu: str or bytearray
:raise EncodingError: If the specified PDU data cannot be decoded
:return: The decoded SMS data as a dictionary
:rtype: dict
"""
try:
pdu = toByteArray(pdu)
except Exception as e:
# Python 2 raises TypeError, Python 3 raises binascii.Error
raise EncodingError(e)
result = {}
pduIter = iter(pdu)
smscNumber, smscBytesRead = _decodeAddressField(pduIter, smscField=True)
result['smsc'] = smscNumber
result['tpdu_length'] = len(pdu) - smscBytesRead
tpduFirstOctet = next(pduIter)
pduType = tpduFirstOctet & 0x03 # bits 1-0
if pduType == 0x00: # SMS-DELIVER or SMS-DELIVER REPORT
result['type'] = 'SMS-DELIVER'
result['number'] = _decodeAddressField(pduIter)[0]
result['protocol_id'] = next(pduIter)
dataCoding = _decodeDataCoding(next(pduIter))
result['time'] = _decodeTimestamp(pduIter)
userDataLen = next(pduIter)
udhPresent = (tpduFirstOctet & 0x40) != 0
ud = _decodeUserData(pduIter, userDataLen, dataCoding, udhPresent)
result.update(ud)
elif pduType == 0x01: # SMS-SUBMIT or SMS-SUBMIT-REPORT
result['type'] = 'SMS-SUBMIT'
result['reference'] = next(pduIter) # message reference - we don't really use this
result['number'] = _decodeAddressField(pduIter)[0]
result['protocol_id'] = next(pduIter)
dataCoding = _decodeDataCoding(next(pduIter))
validityPeriodFormat = (tpduFirstOctet & 0x18) >> 3 # bits 4,3
if validityPeriodFormat == 0x02: # TP-VP field present and integer represented (relative)
result['validity'] = _decodeRelativeValidityPeriod(next(pduIter))
elif validityPeriodFormat == 0x03: # TP-VP field present and semi-octet represented (absolute)
result['validity'] = _decodeTimestamp(pduIter)
userDataLen = next(pduIter)
udhPresent = (tpduFirstOctet & 0x40) != 0
ud = _decodeUserData(pduIter, userDataLen, dataCoding, udhPresent)
result.update(ud)
elif pduType == 0x02: # SMS-STATUS-REPORT or SMS-COMMAND
result['type'] = 'SMS-STATUS-REPORT'
result['reference'] = next(pduIter)
result['number'] = _decodeAddressField(pduIter)[0]
result['time'] = _decodeTimestamp(pduIter)
result['discharge'] = _decodeTimestamp(pduIter)
result['status'] = next(pduIter)
else:
raise EncodingError('Unknown SMS message type: {0}. First TPDU octet was: {1}'.format(pduType, tpduFirstOctet))
return result
def _decodeUserData(byteIter, userDataLen, dataCoding, udhPresent):
""" Decodes PDU user data (UDHI (if present) and message text) """
result = {}
if udhPresent:
# User Data Header is present
result['udh'] = []
udhLen = next(byteIter)
ieLenRead = 0
# Parse and store UDH fields
while ieLenRead < udhLen:
ie = InformationElement.decode(byteIter)
ieLenRead += len(ie)
result['udh'].append(ie)
del ieLenRead
if dataCoding == 0x00: # GSM-7
# Since we are using 7-bit data, "fill bits" may have been added to make the UDH end on a septet boundary
shift = ((udhLen + 1) * 8) % 7 # "fill bits" needed to make the UDH end on a septet boundary
# Simulate another "shift" in the unpackSeptets algorithm in order to ignore the fill bits
prevOctet = next(byteIter)
shift += 1
if dataCoding == 0x00: # GSM-7
if udhPresent:
userDataSeptets = unpackSeptets(byteIter, userDataLen, prevOctet, shift)
else:
userDataSeptets = unpackSeptets(byteIter, userDataLen)
result['text'] = decodeGsm7(userDataSeptets)
elif dataCoding == 0x02: # UCS2
result['text'] = decodeUcs2(byteIter, userDataLen)
else: # 8-bit (data)
userData = []
for b in byteIter:
userData.append(unichr(b))
result['text'] = ''.join(userData)
return result
def _decodeRelativeValidityPeriod(tpVp):
""" Calculates the relative SMS validity period (based on the table in section 9.2.3.12 of GSM 03.40)
:rtype: datetime.timedelta
"""
if tpVp <= 143:
return timedelta(minutes=((tpVp + 1) * 5))
elif 144 <= tpVp <= 167:
return timedelta(hours=12, minutes=((tpVp - 143) * 30))
elif 168 <= tpVp <= 196:
return timedelta(days=(tpVp - 166))
elif 197 <= tpVp <= 255:
return timedelta(weeks=(tpVp - 192))
else:
raise ValueError('tpVp must be in range [0, 255]')
def _encodeRelativeValidityPeriod(validityPeriod):
""" Encodes the specified relative validity period timedelta into an integer for use in an SMS PDU
(based on the table in section 9.2.3.12 of GSM 03.40)
:param validityPeriod: The validity period to encode
:type validityPeriod: datetime.timedelta
:rtype: int
"""
# Python 2.6 does not have timedelta.total_seconds(), so compute it manually
#seconds = validityPeriod.total_seconds()
seconds = validityPeriod.seconds + (validityPeriod.days * 24 * 3600)
if seconds <= 43200: # 12 hours
tpVp = int(seconds / 300) - 1 # divide by 5 minutes, subtract 1
elif seconds <= 86400: # 24 hours
tpVp = int((seconds - 43200) / 1800) + 143 # subtract 12 hours, divide by 30 minutes. add 143
elif validityPeriod.days <= 30: # 30 days
tpVp = validityPeriod.days + 166 # amount of days + 166
elif validityPeriod.days <= 441: # max value of tpVp is 255
tpVp = int(validityPeriod.days / 7) + 192 # amount of weeks + 192
else:
raise ValueError('Validity period too long; tpVp limited to 1 octet (max value: 255)')
return tpVp
def _decodeTimestamp(byteIter):
""" Decodes a 7-octet timestamp """
dateStr = decodeSemiOctets(byteIter, 7)
timeZoneStr = dateStr[-2:]
return datetime.strptime(dateStr[:-2], '%y%m%d%H%M%S').replace(tzinfo=SmsPduTzInfo(timeZoneStr))
def _encodeTimestamp(timestamp):
""" Encodes a 7-octet timestamp from the specified date
Note: the specified timestamp must have a UTC offset set; you can use gsmmodem.util.SimpleOffsetTzInfo for simple cases
:param timestamp: The timestamp to encode
:type timestamp: datetime.datetime
:return: The encoded timestamp
:rtype: bytearray
"""
if timestamp.tzinfo == None:
raise ValueError('Please specify time zone information for the timestamp (e.g. by using gsmmodem.util.SimpleOffsetTzInfo)')
# See if the timezone difference is positive/negative
tzDelta = timestamp.utcoffset()
if tzDelta.days >= 0:
tzValStr = '{0:0>2}'.format(int(tzDelta.seconds / 60 / 15))
else: # negative
tzVal = int((tzDelta.days * -3600 * 24 - tzDelta.seconds) / 60 / 15) # calculate offset in 0.25 hours
# Cast as literal hex value and set MSB of first semi-octet of timezone to 1 to indicate negative value
tzVal = int('{0:0>2}'.format(tzVal), 16) | 0x80
tzValStr = '{0:0>2X}'.format(tzVal)
dateStr = timestamp.strftime('%y%m%d%H%M%S') + tzValStr
return encodeSemiOctets(dateStr)
def _decodeDataCoding(octet):
if octet & 0xC0 == 0:
#compressed = octect & 0x20
alphabet = (octet & 0x0C) >> 2
return alphabet # 0x00 == GSM-7, 0x01 == 8-bit data, 0x02 == UCS2
# We ignore other coding groups
return 0
def nibble2octet(addressLen):
return int((addressLen + 1) / 2)
def _decodeAddressField(byteIter, smscField=False, log=False):
""" Decodes the address field at the current position of the bytearray iterator
:param byteIter: Iterator over bytearray
:type byteIter: iter(bytearray)
:return: Tuple containing the address value and amount of bytes read (value is or None if it is empty (zero-length))
:rtype: tuple
"""
addressLen = next(byteIter)
if addressLen > 0:
toa = next(byteIter)
ton = (toa & 0x70) # bits 6,5,4 of type-of-address == type-of-number
if ton == 0x50:
# Alphanumberic number
addressLen = nibble2octet(addressLen)
septets = unpackSeptets(byteIter, addressLen)
addressValue = decodeGsm7(septets)
return (addressValue, (addressLen + 2))
else:
# ton == 0x00: Unknown (might be international, local, etc) - leave as is
# ton == 0x20: National number
if smscField:
addressValue = decodeSemiOctets(byteIter, addressLen-1)
else:
addressLen = nibble2octet(addressLen)
addressValue = decodeSemiOctets(byteIter, addressLen)
addressLen += 1 # for the return value, add the toa byte
if ton == 0x10: # International number
addressValue = '+' + addressValue
return (addressValue, (addressLen + 1))
else:
return (None, 1)
def _encodeAddressField(address, smscField=False):
""" Encodes the address into an address field
:param address: The address to encode (phone number or alphanumeric)
:type byteIter: str
:return: Encoded SMS PDU address field
:rtype: bytearray
"""
# First, see if this is a number or an alphanumeric string
toa = 0x80 | 0x00 | 0x01 # Type-of-address start | Unknown type-of-number | ISDN/tel numbering plan
alphaNumeric = False
if address.isalnum():
# Might just be a local number
if address.isdigit():
# Local number
toa |= 0x20
else:
# Alphanumeric address
toa |= 0x50
toa &= 0xFE # switch to "unknown" numbering plan
alphaNumeric = True
else:
if address[0] == '+' and address[1:].isdigit():
# International number
toa |= 0x10
# Remove the '+' prefix
address = address[1:]
else:
# Alphanumeric address
toa |= 0x50
toa &= 0xFE # switch to "unknown" numbering plan
alphaNumeric = True
if alphaNumeric:
addressValue = packSeptets(encodeGsm7(address, False))
addressLen = len(addressValue) * 2
else:
addressValue = encodeSemiOctets(address)
if smscField:
addressLen = len(addressValue) + 1
else:
addressLen = len(address)
result = bytearray()
result.append(addressLen)
result.append(toa)
result.extend(addressValue)
return result
def encodeSemiOctets(number):
""" Semi-octet encoding algorithm (e.g. for phone numbers)
:return: bytearray containing the encoded octets
:rtype: bytearray
"""
if len(number) % 2 == 1:
number = number + 'F' # append the "end" indicator
octets = [int(number[i+1] + number[i], 16) for i in xrange(0, len(number), 2)]
return bytearray(octets)
def decodeSemiOctets(encodedNumber, numberOfOctets=None):
""" Semi-octet decoding algorithm(e.g. for phone numbers)
:param encodedNumber: The semi-octet-encoded telephone number (in bytearray format or hex string)
:type encodedNumber: bytearray, str or iter(bytearray)
:param numberOfOctets: The expected amount of octets after decoding (i.e. when to stop)
:type numberOfOctets: int
:return: decoded telephone number
:rtype: string
"""
number = []
if type(encodedNumber) in (str, bytes):
encodedNumber = bytearray(codecs.decode(encodedNumber, 'hex_codec'))
i = 0
for octet in encodedNumber:
hexVal = hex(octet)[2:].zfill(2)
number.append(hexVal[1])
if hexVal[0] != 'f':
number.append(hexVal[0])
else:
break
if numberOfOctets != None:
i += 1
if i == numberOfOctets:
break
return ''.join(number)
def encodeTextMode(plaintext):
""" Text mode checker
Tests whther SMS could be sent in text mode
:param text: the text string to encode
:raise ValueError: if the text string cannot be sent in text mode
:return: Passed string
:rtype: str
"""
if PYTHON_VERSION >= 3:
plaintext = str(plaintext)
elif type(plaintext) == str:
plaintext = plaintext.decode('UTF-8')
for char in plaintext:
idx = TEXT_MODE.find(char)
if idx != -1:
continue
else:
raise ValueError('Cannot encode char "{0}" inside text mode'.format(char))
if len(plaintext) > MAX_MESSAGE_LENGTH[0x00]:
raise ValueError('Message is too long for text mode (maximum {0} characters)'.format(MAX_MESSAGE_LENGTH[0x00]))
return plaintext
def encodeGsm7(plaintext, discardInvalid=False):
""" GSM-7 text encoding algorithm
Encodes the specified text string into GSM-7 octets (characters). This method does not pack
the characters into septets.
:param text: the text string to encode
:param discardInvalid: if True, characters that cannot be encoded will be silently discarded
:raise ValueError: if the text string cannot be encoded using GSM-7 encoding (unless discardInvalid == True)
:return: A bytearray containing the string encoded in GSM-7 encoding
:rtype: bytearray
"""
result = bytearray()
if PYTHON_VERSION >= 3:
plaintext = str(plaintext)
elif type(plaintext) == str:
plaintext = plaintext.decode('UTF-8')
for char in plaintext:
idx = GSM7_BASIC.find(char)
if idx != -1:
result.append(idx)
elif char in GSM7_EXTENDED:
result.append(0x1B) # ESC - switch to extended table
result.append(ord(GSM7_EXTENDED[char]))
elif not discardInvalid:
raise ValueError('Cannot encode char "{0}" using GSM-7 encoding'.format(char))
return result
def decodeGsm7(encodedText):
""" GSM-7 text decoding algorithm
Decodes the specified GSM-7-encoded string into a plaintext string.
:param encodedText: the text string to encode
:type encodedText: bytearray or str
:return: A string containing the decoded text
:rtype: str
"""
result = []
if type(encodedText) == str:
encodedText = rawStrToByteArray(encodedText) #bytearray(encodedText)
iterEncoded = iter(encodedText)
for b in iterEncoded:
if b == 0x1B: # ESC - switch to extended table
c = chr(next(iterEncoded))
for char, value in dictItemsIter(GSM7_EXTENDED):
if c == value:
result.append(char)
break
else:
result.append(GSM7_BASIC[b])
return ''.join(result)
def divideTextGsm7(plainText):
""" GSM7 message dividing algorithm
Divides text into list of chunks that could be stored in a single, GSM7-encoded SMS message.
:param plainText: the text string to divide
:type plainText: str
:return: A list of strings
:rtype: list of str
"""
result = []
plainStartPtr = 0
plainStopPtr = 0
chunkByteSize = 0
if PYTHON_VERSION >= 3:
plainText = str(plainText)
while plainStopPtr < len(plainText):
char = plainText[plainStopPtr]
idx = GSM7_BASIC.find(char)
if idx != -1:
chunkByteSize = chunkByteSize + 1;
elif char in GSM7_EXTENDED:
chunkByteSize = chunkByteSize + 2;
else:
raise ValueError('Cannot encode char "{0}" using GSM-7 encoding'.format(char))
plainStopPtr = plainStopPtr + 1
if chunkByteSize > MAX_MULTIPART_MESSAGE_LENGTH[0x00]:
plainStopPtr = plainStopPtr - 1
if chunkByteSize >= MAX_MULTIPART_MESSAGE_LENGTH[0x00]:
result.append(plainText[plainStartPtr:plainStopPtr])
plainStartPtr = plainStopPtr
chunkByteSize = 0
if chunkByteSize > 0:
result.append(plainText[plainStartPtr:])
return result
def packSeptets(octets, padBits=0):
""" Packs the specified octets into septets
Typically the output of encodeGsm7 would be used as input to this function. The resulting
bytearray contains the original GSM-7 characters packed into septets ready for transmission.
:rtype: bytearray
"""
result = bytearray()
if type(octets) == str:
octets = iter(rawStrToByteArray(octets))
elif type(octets) == bytearray:
octets = iter(octets)
shift = padBits
if padBits == 0:
try:
prevSeptet = next(octets)
except StopIteration:
return result
else:
prevSeptet = 0x00
for octet in octets:
septet = octet & 0x7f;
if shift == 7:
# prevSeptet has already been fully added to result
shift = 0
prevSeptet = septet
continue
b = ((septet << (7 - shift)) & 0xFF) | (prevSeptet >> shift)
prevSeptet = septet
shift += 1
result.append(b)
if shift != 7:
# There is a bit "left over" from prevSeptet
result.append(prevSeptet >> shift)
return result
def unpackSeptets(septets, numberOfSeptets=None, prevOctet=None, shift=7):
""" Unpacks the specified septets into octets
:param septets: Iterator or iterable containing the septets packed into octets
:type septets: iter(bytearray), bytearray or str
:param numberOfSeptets: The amount of septets to unpack (or None for all remaining in "septets")
:type numberOfSeptets: int or None
:return: The septets unpacked into octets
:rtype: bytearray
"""
result = bytearray()
if type(septets) == str:
septets = iter(rawStrToByteArray(septets))
elif type(septets) == bytearray:
septets = iter(septets)
if numberOfSeptets == None:
numberOfSeptets = MAX_INT # Loop until StopIteration
if numberOfSeptets == 0:
return result
i = 0
for octet in septets:
i += 1
if shift == 7:
shift = 1
if prevOctet != None:
result.append(prevOctet >> 1)
if i <= numberOfSeptets:
result.append(octet & 0x7F)
prevOctet = octet
if i == numberOfSeptets:
break
else:
continue
b = ((octet << shift) & 0x7F) | (prevOctet >> (8 - shift))
prevOctet = octet
result.append(b)
shift += 1
if i == numberOfSeptets:
break
if shift == 7 and prevOctet:
b = prevOctet >> (8 - shift)
if b:
# The final septet value still needs to be unpacked
result.append(b)
return result
def decodeUcs2(byteIter, numBytes):
""" Decodes UCS2-encoded text from the specified byte iterator, up to a maximum of numBytes """
userData = []
i = 0
try:
while i < numBytes:
userData.append(unichr((next(byteIter) << 8) | next(byteIter)))
i += 2
except StopIteration:
# Not enough bytes in iterator to reach numBytes; return what we have
pass
return ''.join(userData)
def encodeUcs2(text):
""" UCS2 text encoding algorithm
Encodes the specified text string into UCS2-encoded bytes.
:param text: the text string to encode
:return: A bytearray containing the string encoded in UCS2 encoding
:rtype: bytearray
"""
result = bytearray()
for b in map(ord, text):
result.append(b >> 8)
result.append(b & 0xFF)
return result
def divideTextUcs2(plainText):
""" UCS-2 message dividing algorithm
Divides text into list of chunks that could be stored in a single, UCS-2 -encoded SMS message.
:param plainText: the text string to divide
:type plainText: str
:return: A list of strings
:rtype: list of str
"""
result = []
resultLength = 0
fullChunksCount = int(len(plainText) / MAX_MULTIPART_MESSAGE_LENGTH[0x08])
for i in range(fullChunksCount):
result.append(plainText[i * MAX_MULTIPART_MESSAGE_LENGTH[0x08] : (i + 1) * MAX_MULTIPART_MESSAGE_LENGTH[0x08]])
resultLength = resultLength + MAX_MULTIPART_MESSAGE_LENGTH[0x08]
# Add last, not fully filled chunk
if resultLength < len(plainText):
result.append(plainText[resultLength:])
return result
-147
View File
@@ -1,147 +0,0 @@
#!/usr/bin/env python
""" Low-level serial communications handling """
import sys, threading, logging
import re
import serial # pyserial: http://pyserial.sourceforge.net
from .exceptions import TimeoutException
from . import compat # For Python 2.6 compatibility
class SerialComms(object):
""" Wraps all low-level serial communications (actual read/write operations) """
log = logging.getLogger('gsmmodem.serial_comms.SerialComms')
# End-of-line read terminator
RX_EOL_SEQ = b'\r\n'
# End-of-response terminator
RESPONSE_TERM = re.compile('^OK|ERROR|(\+CM[ES] ERROR: \d+)|(COMMAND NOT SUPPORT)$')
# Default timeout for serial port reads (in seconds)
timeout = 1
def __init__(self, port, baudrate=115200, notifyCallbackFunc=None, fatalErrorCallbackFunc=None, *args, **kwargs):
""" Constructor
:param fatalErrorCallbackFunc: function to call if a fatal error occurs in the serial device reading thread
:type fatalErrorCallbackFunc: func
"""
self.alive = False
self.port = port
self.baudrate = baudrate
self._responseEvent = None # threading.Event()
self._expectResponseTermSeq = None # expected response terminator sequence
self._response = None # Buffer containing response to a written command
self._notification = [] # Buffer containing lines from an unsolicited notification from the modem
# Reentrant lock for managing concurrent write access to the underlying serial port
self._txLock = threading.RLock()
self.notifyCallback = notifyCallbackFunc or self._placeholderCallback
self.fatalErrorCallback = fatalErrorCallbackFunc or self._placeholderCallback
self.com_args = args
self.com_kwargs = kwargs
def connect(self):
""" Connects to the device and starts the read thread """
self.serial = serial.Serial(dsrdtr=True, rtscts=True, port=self.port, baudrate=self.baudrate,
timeout=self.timeout, *self.com_args, **self.com_kwargs)
# Start read thread
self.alive = True
self.rxThread = threading.Thread(target=self._readLoop)
self.rxThread.daemon = True
self.rxThread.start()
def close(self):
""" Stops the read thread, waits for it to exit cleanly, then closes the underlying serial port """
self.alive = False
self.rxThread.join()
self.serial.close()
def _handleLineRead(self, line, checkForResponseTerm=True):
# print 'sc.hlineread:',line
if self._responseEvent and not self._responseEvent.is_set():
# A response event has been set up (another thread is waiting for this response)
self._response.append(line)
if not checkForResponseTerm or self.RESPONSE_TERM.match(line):
# End of response reached; notify waiting thread
# print 'response:', self._response
self.log.debug('response: %s', self._response)
self._responseEvent.set()
else:
# Nothing was waiting for this - treat it as a notification
self._notification.append(line)
if self.serial.inWaiting() == 0:
# No more chars on the way for this notification - notify higher-level callback
# print 'notification:', self._notification
self.log.debug('notification: %s', self._notification)
self.notifyCallback(self._notification)
self._notification = []
def _placeholderCallback(self, *args, **kwargs):
""" Placeholder callback function (does nothing) """
def _readLoop(self):
""" Read thread main loop
Reads lines from the connected device
"""
try:
readTermSeq = bytearray(self.RX_EOL_SEQ)
readTermLen = len(readTermSeq)
rxBuffer = bytearray()
while self.alive:
data = self.serial.read(1)
if len(data) != 0: # check for timeout
# print >> sys.stderr, ' RX:', data,'({0})'.format(ord(data))
rxBuffer.append(ord(data))
if rxBuffer[-readTermLen:] == readTermSeq:
# A line (or other logical segment) has been read
line = rxBuffer[:-readTermLen].decode()
rxBuffer = bytearray()
if len(line) > 0:
# print 'calling handler'
self._handleLineRead(line)
elif self._expectResponseTermSeq:
if rxBuffer[-len(self._expectResponseTermSeq):] == self._expectResponseTermSeq:
line = rxBuffer.decode()
rxBuffer = bytearray()
self._handleLineRead(line, checkForResponseTerm=False)
# else:
# ' <RX timeout>'
except serial.SerialException as e:
self.alive = False
try:
self.serial.close()
except Exception: # pragma: no cover
pass
# Notify the fatal error handler
self.fatalErrorCallback(e)
def write(self, data, waitForResponse=True, timeout=5, expectedResponseTermSeq=None):
data = data.encode()
with self._txLock:
if waitForResponse:
if expectedResponseTermSeq:
self._expectResponseTermSeq = bytearray(expectedResponseTermSeq.encode())
self._response = []
self._responseEvent = threading.Event()
self.serial.write(data)
if self._responseEvent.wait(timeout):
self._responseEvent = None
self._expectResponseTermSeq = False
return self._response
else: # Response timed out
self._responseEvent = None
self._expectResponseTermSeq = False
if len(self._response) > 0:
# Add the partial response to the timeout exception
raise TimeoutException(self._response)
else:
raise TimeoutException()
else:
self.serial.write(data)
-130
View File
@@ -1,130 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Some common utility classes used by tests """
from datetime import datetime, timedelta, tzinfo
import re
class SimpleOffsetTzInfo(tzinfo):
""" Very simple implementation of datetime.tzinfo offering set timezone offset for datetime instances """
def __init__(self, offsetInHours=None):
""" Constructs a new tzinfo instance using an amount of hours as an offset
:param offsetInHours: The timezone offset, in hours (may be negative)
:type offsetInHours: int or float
"""
if offsetInHours != None: # pragma: no cover
self.offsetInHours = offsetInHours
def utcoffset(self, dt):
return timedelta(hours=self.offsetInHours)
def dst(self, dt):
return timedelta(0)
def __repr__(self):
return 'gsmmodem.util.SimpleOffsetTzInfo({0})'.format(self.offsetInHours)
def parseTextModeTimeStr(timeStr):
""" Parses the specified SMS text mode time string
The time stamp format is "yy/MM/dd,hh:mm:ss±zz"
(yy = year, MM = month, dd = day, hh = hour, mm = minute, ss = second, zz = time zone
[Note: the unit of time zone is a quarter of an hour])
:param timeStr: The time string to parse
:type timeStr: str
:return: datetime object representing the specified time string
:rtype: datetime.datetime
"""
msgTime = timeStr[:-3]
tzOffsetHours = int(int(timeStr[-3:]) * 0.25)
return datetime.strptime(msgTime, '%y/%m/%d,%H:%M:%S').replace(tzinfo=SimpleOffsetTzInfo(tzOffsetHours))
def lineStartingWith(string, lines):
""" Searches through the specified list of strings and returns the
first line starting with the specified search string, or None if not found
"""
for line in lines:
if line.startswith(string):
return line
else:
return None
def lineMatching(regexStr, lines):
""" Searches through the specified list of strings and returns the regular expression
match for the first line that matches the specified regex string, or None if no match was found
Note: if you have a pre-compiled regex pattern, use lineMatchingPattern() instead
:type regexStr: Regular expression string to use
:type lines: List of lines to search
:return: the regular expression match for the first line that matches the specified regex, or None if no match was found
:rtype: re.Match
"""
regex = re.compile(regexStr)
for line in lines:
m = regex.match(line)
if m:
return m
else:
return None
def lineMatchingPattern(pattern, lines):
""" Searches through the specified list of strings and returns the regular expression
match for the first line that matches the specified pre-compiled regex pattern, or None if no match was found
Note: if you are using a regex pattern string (i.e. not already compiled), use lineMatching() instead
:type pattern: Compiled regular expression pattern to use
:type lines: List of lines to search
:return: the regular expression match for the first line that matches the specified regex, or None if no match was found
:rtype: re.Match
"""
for line in lines:
m = pattern.match(line)
if m:
return m
else:
return None
def allLinesMatchingPattern(pattern, lines):
""" Like lineMatchingPattern, but returns all lines that match the specified pattern
:type pattern: Compiled regular expression pattern to use
:type lines: List of lines to search
:return: list of re.Match objects for each line matched, or an empty list if none matched
:rtype: list
"""
result = []
for line in lines:
m = pattern.match(line)
if m:
result.append(m)
return result
def removeAtPrefix(string):
""" Remove AT prefix from a specified string.
:param string: An original string
:type string: str
:return: A string with AT prefix removed
:rtype: str
"""
if string.startswith('AT'):
return string[2:]
return string
+1 -1
View File
@@ -8,4 +8,4 @@ def init_logger():
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%D:%H:%M:%S',
level=logging.INFO)
level=logging.DEBUG)
+46 -30
View File
@@ -2,6 +2,7 @@ import re
import time
import serial
from gsmmodem import GsmModem
from serial import Serial
from definitions import BAUDRATE
@@ -9,7 +10,9 @@ from error.SIMError import SIMError
from logs.LogSender import LOG_APPOINTMENT_SUCCESS, SUBJECT_SIM_INFO
from params import firebase_store_manager, oracle_log_sender
from pojo.SimInfoPojo import SimInfoPojo
from pojo.serial_modem import SerialModem
from utils.excel_reader import ExcelHelper
from utils.operator import check_operator, Operator
class ModemPool:
@@ -43,42 +46,55 @@ class ModemPool:
return msg
def get_raw_phone_number(self, slot_position):
for index, ser in enumerate(self._serial_list):
print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port))
if not self._select_sim_storage(ser):
print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR))
continue
msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser)
if "Unfortunately" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED))
continue
elif "CME ERROR" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR))
continue
elif len(msg) == 0:
print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT))
continue
# find phone number
match = re.search(r'33\d{9}', str(msg))
phone_number = match.group(0)
print("phone is " + phone_number)
sim_position = index + 1
position = (slot_position - 1) * len(self._port_list) + sim_position
# unlock sim
unlock_cmd = 'AT+CPIN="{0}\r"'.format("0000")
self._send_command(unlock_cmd, ser, 10)
cmd = "AT+CCID\r"
response = str(self._send_command(cmd, ser))
ccid_group = re.search("[0-9F]+", response)
ccid = ccid_group.group(0)
sim_position = index + 1
position = (slot_position - 1) * len(self._port_list) + sim_position
if phone_number:
self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid, position=position))
self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO,
type=LOG_APPOINTMENT_SUCCESS)
# write the number to sim card's phonebook
cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r'
self._send_command(cmd, ser, wait_time_in_s=2)
self.get_own_number(ser)
operator = check_operator(ccid)
if operator == Operator.SFR or operator == Operator.CHINA_TELECOM:
contacts = self._excel_helper.read_contacts()
contact = [contact for contact in contacts if
contact.ccid.replace("F", "") == ccid.replace("F", "")]
if len(contact) > 0:
phone_number = contact[0].phone
self._db_manager.save_sim_info(SimInfoPojo(phone=str(phone_number), ccid=ccid, position=position))
else:
error_msg = "slot({}),sim({})".format(slot_position, sim_position)
oracle_log_sender.send_contact_not_found(error_msg)
else:
print("will get phone number for slot({}) SIM({}), port:{}".format(slot_position, index + 1, ser.port))
if not self._select_sim_storage(ser):
print(self._generate_error_msg(slot_position, index, SIMError.STORAGE_ERROR))
continue
msg = self._execut_USSD_cmd("AT+CUSD=1, *132#\r", ser)
if "Unfortunately" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.SIM_DISABLED))
continue
elif "CME ERROR" in str(msg):
print(self._generate_error_msg(slot_position, index, SIMError.CME_ERROR))
continue
elif len(msg) == 0:
print(self._generate_error_msg(slot_position, index, SIMError.TIMEOUT))
continue
# find phone number
match = re.search(r'33\d{9}', str(msg))
phone_number = match.group(0)
print("phone is " + phone_number)
def get_own_number(self, ser: Serial):
print("saved phone number: " + str(self._send_command(f'AT+CPBR={self.phone_number_position}\r', ser)))
if phone_number:
self._db_manager.save_sim_info(SimInfoPojo(phone=phone_number, ccid=ccid, position=position))
self._log_sender.send_log(phone_number, source=self.TAG, subject=SUBJECT_SIM_INFO,
type=LOG_APPOINTMENT_SUCCESS)
# write the number to sim card's phonebook
cmd = f'AT+CPBW={self.phone_number_position},\"{phone_number}\"\r'
self._send_command(cmd, ser, wait_time_in_s=2)
def _select_sim_storage(self, ser) -> bool:
# use SIM Card storage
+4 -1
View File
@@ -1,15 +1,17 @@
import logging
import sys
import params
from modems.ModemPool import ModemPool
from logs.AppLogging import init_logger
from logs.LogSender import LOG_SUBJECT_EVENT, TYPE_EVENT_RESET_ALL_SIM_CARDS
from main import card_pool, get_devices_ports
from wait_for_sms import start_waiting_sms
def read_all_the_phone_number():
params.oracle_log_sender.send_log(msg="SIM卡自检开始", subject=LOG_SUBJECT_EVENT, type=TYPE_EVENT_RESET_ALL_SIM_CARDS)
slot_number = 16
slot_number = 1
slot_sum = 27
params.firebase_store_manager.clear_all_sim_info()
for i in range(slot_number, slot_sum + 1):
@@ -24,4 +26,5 @@ def read_all_the_phone_number():
if __name__ == '__main__':
init_logger()
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
read_all_the_phone_number()
+10 -11
View File
@@ -20,9 +20,6 @@ commandor = CommandorPage()
thread_event = None
current_gsm_modem = None
card_pool = CardPool(CARD_POOL_PORT)
# used to save the current slot position
current_card_pool_slot = 1
current_sim_position = 1
def get_devices_ports() -> list:
@@ -49,9 +46,13 @@ def timeout_occurred(serial_modem: SerialModem):
def start_to_handle_sms(serial_modem: SerialModem):
global current_gsm_modem
current_gsm_modem = serial_modem.modem
if check_operator(serial_modem.ccid) == Operator.LYCAMOBILE:
# if check_operator(serial_modem.ccid) == Operator.LYCAMOBILE:
# lycamobile
try:
current_gsm_modem.deleteMultipleStoredSms(memory="SM")
except Exception as error:
print(error)
serial_modem.modem.smsReceivedCallback = handle_sms
serial_modem.modem.smsTextMode = False
logger.info('Waiting for SMS message, for phone number ' + str(serial_modem.phone_number))
@@ -84,9 +85,10 @@ def init_modems() -> list:
return modems
def start_book():
slot_number = 2
slot_sum = 2
def start_waiting_sms():
# logger = logging.getLogger()
slot_number = 1
slot_sum = 28
for i in range(slot_number, slot_sum + 1):
card_pool.reset()
logger.info("will switch to " + str(i))
@@ -97,10 +99,7 @@ def start_book():
# read the contact, and merge the 2 objects together
excel_reader = ExcelHelper()
contacts = excel_reader.read_contacts()
global current_sim_position
current_sim_position = 1
for modem in modem_list:
current_sim_position = current_sim_position + 1
try:
# get contact for current modem
modem.get_ccid()
@@ -119,4 +118,4 @@ if __name__ == '__main__':
init_logger()
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
start_book()
start_waiting_sms()