-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapplication.py
88 lines (67 loc) · 2.4 KB
/
application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from flask_socketio import SocketIO, emit
from flask import Flask, render_template, url_for, copy_current_request_context
from random import random
from time import sleep
from threading import Thread, Event
import serial
import struct
import time
from datetime import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.config['DEBUG'] = True
#serial data
arduino = serial.Serial(port='/dev/ttyS0',
baudrate = 9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1)
#turn the flask app into a socketio app
socketio = SocketIO(app, async_mode=None, logger=True, engineio_logger=True)
#random number Generator Thread
thread = Thread()
thread_stop_event = Event()
def NumberGenerator():
"""
Generate a random number every 1 second and emit to a socketio instance (broadcast)
Ideally to be run in a separate thread?
"""
#infinite loop of magical random numbers
print("Starting to receive data")
while not thread_stop_event.isSet():
lista = []
while True:
data = arduino.read()
if data:
if data == b'f':
data2 = arduino.read(4)
lista.append(struct.unpack('f',data2)[0])
pass
if data == b';':
break
else:
pass
print(lista)
now = datetime.now()
timestamp = now.strftime("%H:%M:%S")
socketio.emit('newnumber', {'sin': lista[0], 'cos': lista[1], 'tan':lista[2], 'timestamp': timestamp}, namespace='/test')
socketio.sleep(0.1)
@app.route('/')
def index():
#only by sending this page first will the client be connected to the socketio instance
return render_template('index.html')
@socketio.on('connect', namespace='/test')
def test_connect():
# need visibility of the global thread object
global thread
print('Client connected')
#Start the random number generator thread only if the thread has not been started before.
if not thread.isAlive():
print("Starting Thread")
thread = socketio.start_background_task(NumberGenerator)
@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected')
if __name__ == '__main__':
socketio.run(app)