-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
84 lines (66 loc) · 2.61 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# @author mhashim6 on 1/23/20
import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue
from bluetooth import BluetoothConnection
from camera_feed import run_video_cam, capture
from io_types import *
from object_detection.tflite_objectDetection import detect as detect_objects
from facial_recognition.face_reco import recognize as face_reco
from ocr.ocr import ocr
import time
from utils import pretty_objects, json_of_result, log
tasks = queue.Queue(0)
current_frame = None
def fetch_new_tasks(remote: BluetoothConnection):
while True:
line = remote.read().decode()
tasks.put(line)
time.sleep(0.5)
def process_tasks(remote: BluetoothConnection):
while True:
task = tasks.get()
# $type_$value
task_segments = task.split('_')
task_type = int(task_segments[0])
if task_type == INPUT_MODE_CHANGE:
change_type = int(task_segments[1])
if change_type == MODE_CHANGE_OCR:
log('MODE_CHANGE_OCR')
transcript = ocr(capture(), 'ara+eng')
log('ocr: ', transcript)
remote.send(json_of_result(RESULT_OCR, transcript))
elif change_type == MODE_CHANGE_SCENE:
log('MODE_CHANGE_SCENE')
elif change_type == MODE_CHANGE_FACE_RECOGNITION:
log('MODE_CHANGE_FACE_RECOGNITION')
face = face_reco(capture())
log('face: ', face)
remote.send(json_of_result(RESULT_FACE_RECOGNITION, face))
elif change_type == MODE_CHANGE_EMOTIONS:
log('MODE_CHANGE_EMOTIONS')
elif change_type == MODE_CHANGE_OBJECT_DETECTION:
log('MODE_CHANGE_OBJECT_DETECTION')
objs = pretty_objects(detect_objects(capture()))
log('object detection results: ', objs)
remote.send(json_of_result(RESULT_OBJECT_DETECTION, objs))
log("process: ", task)
def detected_new_scene(frame):
global current_frame
current_frame = frame
log("a new scene")
# TODO: ease up a little.
# time.sleep(0.5)
# TODO: tasks.put(what?)
def new_scenes():
run_video_cam(detected_new_scene)
def start_app():
remote = BluetoothConnection()
executor = ThreadPoolExecutor(3)
loop = asyncio.get_event_loop()
asyncio.ensure_future(loop.run_in_executor(executor, fetch_new_tasks, remote))
asyncio.ensure_future(loop.run_in_executor(executor, new_scenes))
asyncio.ensure_future(loop.run_in_executor(executor, process_tasks, remote))
loop.run_forever()
if __name__ == '__main__':
start_app()