-
Notifications
You must be signed in to change notification settings - Fork 4
/
c8yagent.py
150 lines (119 loc) · 4.88 KB
/
c8yagent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
from logging.handlers import RotatingFileHandler
import paho.mqtt.client as mqtt
from c8ydp.device_proxy import DeviceProxy, WebSocketFailureException
import sys
import time
device_id = "1234567890"
baseurl = "mqtt.cumulocity.com"
tenant = '<tenantId>'
user = '<user>'
tenantuser = tenant + '/' + user
password = '<password>'
token = None
remote_access_op_template = 'da600'
remote_access_default_template = '530'
fragment = 'c8y_RemoteAccessConnect'
template_id = 'remoteConnect'
tcp_buffer_size = 1024
logger = logging.getLogger('C8YAgent')
loglevel = 'INFO'
logger.setLevel(loglevel)
logHandler = RotatingFileHandler('C8YAgent.log', maxBytes=1 * 1024 * 1024, backupCount=5)
log_formatter = logging.Formatter('%(asctime)s %(threadName)s %(levelname)s %(name)s %(message)s')
logHandler.setFormatter(log_formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
logger.addHandler(logHandler)
is_close = False
receivedMessages = []
def on_log(client, userdata, level, buf):
logger.debug(f'MQTT Debug log: {buf}')
def connect(device_id):
logger.info('MQTT Client connecting to C8Y...')
mqttClient = mqtt.Client(client_id=device_id)
mqttClient.username_pw_set(f'{tenant}/{user}', password)
mqttClient.on_message = on_message
mqttClient.on_publish = on_publish
mqttClient.on_connect = on_connect
mqttClient.connect(baseurl, 1883)
mqttClient.loop_start()
mqttClient.on_log = on_log
return mqttClient
def stop():
is_close = True
disconnect(mqttClient, device_id)
def disconnect(mqttClient, device_id):
mqttClient.loop_stop() # stop the loop
mqttClient.disconnect()
logger.info("Disconnecting MQTT Client")
def set_executing(mqttClient, fragment):
publish(mqttClient, "s/us", f'501,{fragment}', False)
def set_failed(mqttClient, fragment, failureReason):
publish(mqttClient, "s/us", f'502,{fragment},{failureReason}', False)
def set_success(mqttClient, fragment):
publish(mqttClient, "s/us", f'503,{fragment}', False)
def proxy_connect(message):
"""
Creates the Device Proxy and connects to WebSocket and TCP Port
"""
tcp_host = message[2]
tcp_port = int(message[3])
connection_key = message[4]
if token is None and tenant is None and user is None and password is None:
raise WebSocketFailureException(
'OAuth Token or tenantuser and password must be provided!')
device_proxy = DeviceProxy(
tcp_host, tcp_port, tcp_buffer_size, connection_key, baseurl, tenantuser, password, token, on_close_handler)
device_proxy.connect()
set_success(mqttClient, fragment)
def on_close_handler(close_status, close_reason):
logger.info(f'Device Proxy has been closed with status {close_status}, reason {close_reason}')
stop()
def on_message(client, userdata, message):
try:
logger.info("Received operation '{0}'".format(str(message.payload)))
payload = message.payload.decode("utf-8")
payload_array = payload.split(',')
if payload_array[0] == remote_access_op_template or payload_array[0] == remote_access_default_template:
set_executing(client, fragment)
proxy_connect(payload_array)
except Exception as ex:
logging.error(f'Handling operation error. exception={ex}')
set_failed(mqttClient, fragment, str(ex))
def on_publish(client, userdata, mid):
# receivedMessages.append(mid)
logger.debug("mid: '{0}".format(mid))
def on_connect(client, userdata, flag, rc):
if rc==0:
logger.info('MQTT Client succesfully connected!')
subscribe(mqttClient, 's/e',0)
subscribe(mqttClient, 's/ds',0)
subscribe(mqttClient, f's/dc/{template_id}',0)
publish(mqttClient, "s/us", "100,Remote Access Demo Device " + device_id + ",c8y_RemoteAccessDemoDevice", False)
publish(mqttClient, "s/us", f'114,{fragment}', False)
else:
disconnect(mqttClient, device_id)
def publish(mqttClient, topic, message, waitForAck=False):
logger.debug("Publishing: '{0}' | '{1}' | '{2}'".format(topic, message, str(waitForAck)))
mid = mqttClient.publish(topic, message, 2)[1]
if (waitForAck):
while mid not in receivedMessages:
time.sleep(0.25)
def subscribe(mqttClient, topic, qos):
mqttClient.subscribe(topic, qos)
def mqtt_loop():
while not is_close:
try:
logger.debug('Looping Cycle')
time.sleep(10)
except Exception as e:
disconnect(mqttClient, device_id)
if isinstance(e, KeyboardInterrupt):
sys.exit()
logger.error("Error on processing payload '{0}'".format(e))
if __name__ == "__main__":
mqttClient = connect(device_id)
mqtt_loop()
# ----------Register device(s)----------------------------------