-
Notifications
You must be signed in to change notification settings - Fork 1
/
hogwatch2.py
116 lines (89 loc) · 3.3 KB
/
hogwatch2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
import asyncio
import hashlib
import json
import logging
import os
import signal
import sys
import janus
import websockets
import pynethogs
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
USERS = set()
queue = None
def get_hash(websocket):
return hashlib.sha256(str(hash(websocket)).encode()).hexdigest()
async def consumer(websocket, message):
data = json.loads(message.replace('\'', '\"'))
if 'action' in data and 'interface' in data:
action = data['action']
interface = data['interface']
if action == 'add':
if not interface in websocket.interfaces:
websocket.interfaces.append(interface)
logging.info('%s adding interface: %s' % (websocket.id, interface))
logging.info('%s current interfaces: %s' % (websocket.id, websocket.interfaces))
elif action == 'remove':
if interface in websocket.interfaces:
websocket.interfaces.remove(interface)
logging.info('%s removing interface: %s' % (websocket.id, interface))
logging.info('%s current interfaces: %s' % (websocket.id, websocket.interfaces))
else:
logging.error('%s unsupported event: %s' % (websocket.id, action))
else:
logging.error('%s parameter missing: %s' % (websocket.id, data))
async def consumer_handler(websocket, path):
try:
while True:
message = await websocket.recv()
await consumer(websocket, message)
except websockets.exceptions.ConnectionClosed as e:
pass
async def producer_handler():
while True:
message = await queue.async_q.get()
message_dict = json.loads(message)
for websocket in USERS:
if len(websocket.interfaces) > 0:
if message_dict['device_name'] in websocket.interfaces:
await websocket.send(message)
else:
await websocket.send(message)
async def register(websocket):
USERS.add(websocket)
async def unregister(websocket):
USERS.remove(websocket)
async def handler(websocket, path):
websocket.interfaces = list()
websocket.id = get_hash(websocket)
await register(websocket)
logging.info('%s connected' % websocket.id)
try:
consumer_task = asyncio.ensure_future(consumer_handler(websocket, path))
producer_task = asyncio.ensure_future(producer_handler())
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
finally:
logging.info('%s disconnected' % websocket.id)
await unregister(websocket)
def signal_handler(signal, frame):
sys.exit(0)
async def main():
global queue
queue = janus.Queue()
asyncio.get_running_loop().run_in_executor(None, pynethogs.main, queue.sync_q)
if __name__ == '__main__':
if os.getuid() != 0:
print('This has to be run as root sorry :/')
else:
signal.signal(signal.SIGINT, signal_handler)
loop = asyncio.get_event_loop()
loop.create_task(main())
start_server = websockets.serve(handler, "0.0.0.0", 8765)
loop.run_until_complete(start_server)
loop.run_forever()