import time
import serial
import sys
import serial.tools.list_ports
# global variables
TIMEOUT = 3 # seconds
####################################
# Main program
ser = serial.Serial()
port_list = list(serial.tools.list_ports.comports())
print(port_list)
if len(port_list) == 0:
print("NB-IoT Dongle 未安裝!")
else:
for i in range(0,len(port_list)):
print(port_list[i])
nb= NBIoT() #define NBIoT Class
#Loop
try:
nb.sendATComm("ATE1","OK\r\n")
nb. getATI()
nb.getIMEI()
time.sleep(0.5)
nb.getFirmwareInfo()
time.sleep(0.5)
nb.getHardwareInfo()
time.sleep(0.5)
nb.getIPAddress()
time.sleep(0.5)
nb.getSignalQuality()
time.sleep(0.5)
nb.closeallMQTT()
nb.openMQTT("broker.hivemq.com",1883)
time.sleep(0.5)
nb.connectMQTT("MQTTclient1")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/LED","1")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/TEMP","21.20")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/HUMI","52.20")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/LED","0")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/TEMP","22.30")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/HUMI","53.40")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/LED","1")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/TEMP","23.40")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/HUMI","54.50")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/LED","0")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/TEMP","24.50")
time.sleep(0.5)
nb.publishMQTT("NB/BC26/USER99/HUMI","55.60")
time.sleep(0.5)
nb.closeMQTT()
except KeyboardInterrupt:
print('AT+QMTCLOSE=0')
nb.closeallMQTT() # 關閉MQTT 連線
sleep(3)
ser.close() # 清除序列通訊物件
print('結束 MQTT 測試!')
# function for printing debug message
def debug_print(message):
print(message)
# function for getting time as miliseconds
def millis():
return int(time.time())
# function for delay as miliseconds
def delay(ms):
time.sleep(float(ms/1000.0))
####################################
### NB IoT Shield Class ################
####################################
class NBIoT:
board = "" # Shield name
ip_address = "" # ip address
domain_name = "" # domain name
port_number = "" # port number
timeout = TIMEOUT # default timeout for function and methods on this library.
compose = ""
response = ""
# Default Initializer
def __init__(self, serial_port=“/dev/ttyAMA0”, serial_baudrate=115200, board="BC-26"):
self.board = board
ser.port = serial_port
ser.baudrate = serial_baudrate
ser.parity=serial.PARITY_NONE
ser.stopbits=serial.STOPBITS_ONE
ser.bytesize=serial.EIGHTBITS
debug_print(self.board + " Class initialized!")
# Function for clearing compose variable
def clear_compose(self):
self.compose = ""
# Function for getting modem response
def getResponse(self, desired_response):
if (ser.isOpen() == False):
ser.open()
while 1:
self.response =""
while(ser.inWaiting()):
self.response += ser.read(ser.inWaiting()).decode('utf-8', errors='ignore')
if(self.response.find(desired_response) != -1):
debug_print(self.response)
break
# Function for sending at comamand to module
def sendATCommOnce(self, command):
if (ser.isOpen() == False):
ser.open()
self.compose = ""
self.compose = str(command) + "\r"
ser.reset_input_buffer()
ser.write(self.compose.encode())
debug_print(self.compose)
# Function for sending at command to BC26_AT.
def sendATComm(self, command, desired_response, timeout = None):
if timeout is None:
timeout = self.timeout
self.sendATCommOnce(command)
f_debug = False
timer = millis()
while 1:
if( millis() - timer > timeout):
self.sendATCommOnce(command)
timer = millis()
f_debug = False
self.response =""
while(ser.inWaiting()):
try:
self.response += ser.read(ser.inWaiting()).decode('utf-8', errors='ignore')
delay(100)
except Exception as e:
debug_print(e.Message)
# debug_print(self.response)
if(self.response.find(desired_response) != -1):
debug_print(self.response)
break
# Function for saving conf. and reset BC26_AT module
def resetModule(self):
self.saveConfigurations()
delay(200)
self.sendATComm("AT+QRST=1","")
# Function for save configurations tShield be done in current session.
def saveConfigurations(self):
self.sendATComm("AT&W","OK\r\n")
# Function for getting ATI
def getATI(self):
return self.sendATComm("ATI","OK\r\n")
# Function for getting IMEI number
def getIMEI(self):
return self.sendATComm("AT+CGSN=1","OK\r\n")
# Function for getting firmware info
def getFirmwareInfo(self):
return self.sendATComm("AT+CGMR","OK\r\n")
# Function for getting hardware info
def getHardwareInfo(self):
return self.sendATComm("AT+CGMM","OK\r\n")
# Function for getting self.ip_address
def getIPAddress(self):
return self.sendATComm("AT+CGPADDR","OK\r\n")
#******************************************************************************************
#*** Network Service Functions ************************************************************
#******************************************************************************************
# Function for getting signal quality
def getSignalQuality(self):
return self.sendATComm("AT+CSQ","OK\r\n")
#Function for Query NBIoT Band
#def getBand(self):
#return self.sendATComm("AT+QBAND?","OK\r\n")
#*************************************************************************************
#***MQTT Protocols Functions ****BC-26/BC-66******************************
# Function for create MQTT broker server connection
#AT+QMTOPEN=0,"broker.hivemq.com",1883 => +QMTOPEN: 0,0
# *************************************************************************************
def openMQTT(self,mqttserver="broker.hivemq.com",port_number=1883):
self.compose = "AT+QMTOPEN=0,\""
self.compose += str(mqttserver)
self.compose += "\","
self.compose += str(port_number)
self.sendATComm(self.compose,"+QMTOPEN: 0,0")
self.clear_compose()
# Function fo connect MQTT broker server connection
#AT+QMTCONN=0,“LASSclient” => +QMTCONN: 0,0,0
# *************************************************************************************
def connectMQTT(self,clientName):
self.compose = "AT+QMTCONN=0,\""
self.compose += str(clientName)
self.compose += "\""
self.sendATComm(self.compose,"+QMTCONN: 0,0,0")
self.clear_compose()
# Function fo Punlish MQTT Topics payload data
#AT+QMTPUB=0,0,0,0,"NB/BC26/USER99/TEMP","23.10" => +QMTPUB: 0,0,0
def publishMQTT(self,topic,payload):
self.compose = "AT+QMTPUB=0,0,0,0,\""
self.compose += str(topic)
self.compose += "\",\""
self.compose += str(payload)
self.compose += "\""
self.sendATComm(self.compose,"+QMTPUB: 0,0,0")
self.clear_compose()
# Function for closing server connection
def closeMQTT(self):
self.sendATComm("AT+QMTCLOSE=0","+QMTCLOSE: 0,0")
def closeallMQTT(self):
self.sendATComm("AT+QMTCLOSE=0","")
沒有留言:
張貼留言