-
Notifications
You must be signed in to change notification settings - Fork 18
/
monitor.py
60 lines (49 loc) · 1.71 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado import web
import tornado
import zmq.asyncio
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
fh = logging.FileHandler('monitor.log', mode='a')
formatter = logging.Formatter("%(asctime)s - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
logger.addHandler(fh)
ctx = zmq.asyncio.Context()
# zmq response server for listening heartbeats from all services
repserver = ctx.socket(zmq.REP)
repserver.bind('tcp://*:8810')
# zmq pub server for broadcasting messages to all subscribed services
pubserver = ctx.socket(zmq.PUB)
pubserver.bind('tcp://*:8820')
# record status of all services
service_node_status = {}
# handler to deal with heartbeat requests,
# which will be added to tornado event loop and never stop
async def on_request():
try:
while True:
msg = json.loads(await repserver.recv_string())
await repserver.send_string('copy')
service_node_status[msg['sid']] = msg
logger.info(str(service_node_status))
except Exception as e:
logger.exception(e)
await on_request()
class ControlHandler(tornado.web.RequestHandler):
async def post(self):
sid, action = (self.get_argument(i, '') for i in ('sid', 'action'))
pubserver.send_string(json.dumps({'sid': sid, 'action': action}))
self.write('Msg pub')
application = web.Application([
(r"/control", ControlHandler),
])
if __name__ == "__main__":
# provide http server at port 8888
application.listen(8888)
# start tornado main event loop
AsyncIOMainLoop().install()
loop = tornado.ioloop.IOLoop.current()
loop.spawn_callback(on_request)
loop.start()