-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdetect_objects.py
334 lines (289 loc) · 13.2 KB
/
detect_objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import os
import cv2
import imutils
import time
import datetime
import ctypes
import logging
import multiprocessing as mp
import threading
import json
from contextlib import closing
import numpy as np
from object_detection.utils import visualization_utils as vis_util
from flask import Flask, Response, make_response, send_file
import paho.mqtt.client as mqtt
from frigate.util import tonumpyarray
from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher
from frigate.objects import ObjectParser, ObjectCleaner, BestFrameOfType
from frigate.motion import detect_motion
from frigate.video import fetch_frames, FrameTracker
from frigate.object_detection import detect_objects
RTSP_URL = os.getenv('RTSP_URL')
MQTT_HOST = os.getenv('MQTT_HOST')
MQTT_USER = os.getenv('MQTT_USER')
MQTT_PASS = os.getenv('MQTT_PASS')
MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
# REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50"
# REGIONS = "400,350,250,50"
REGIONS = os.getenv('REGIONS')
DEBUG = (os.getenv('DEBUG') == '1')
def main():
DETECTED_OBJECTS = []
recent_motion_frames = {}
# Parse selected regions
regions = []
for region_string in REGIONS.split(':'):
region_parts = region_string.split(',')
region_mask_image = cv2.imread("/config/{}".format(region_parts[5]), cv2.IMREAD_GRAYSCALE)
region_mask = np.where(region_mask_image==[0])
regions.append({
'size': int(region_parts[0]),
'x_offset': int(region_parts[1]),
'y_offset': int(region_parts[2]),
'min_person_area': int(region_parts[3]),
'min_object_size': int(region_parts[4]),
'mask': region_mask,
# Event for motion detection signaling
'motion_detected': mp.Event(),
# create shared array for storing 10 detected objects
# note: this must be a double even though the value you are storing
# is a float. otherwise it stops updating the value in shared
# memory. probably something to do with the size of the memory block
'output_array': mp.Array(ctypes.c_double, 6*10)
})
# capture a single frame and check the frame shape so the correct array
# size can be allocated in memory
video = cv2.VideoCapture(RTSP_URL)
ret, frame = video.read()
if ret:
frame_shape = frame.shape
else:
print("Unable to capture video stream")
exit(1)
video.release()
# compute the flattened array length from the array shape
flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
# create shared array for storing the full frame image data
shared_arr = mp.Array(ctypes.c_uint16, flat_array_length)
# create shared value for storing the frame_time
shared_frame_time = mp.Value('d', 0.0)
# Lock to control access to the frame
frame_lock = mp.Lock()
# Condition for notifying that a new frame is ready
frame_ready = mp.Condition()
# Condition for notifying that motion status changed globally
motion_changed = mp.Condition()
# Condition for notifying that objects were parsed
objects_parsed = mp.Condition()
# Queue for detected objects
object_queue = mp.Queue()
# shape current frame so it can be treated as an image
frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
# start the process to capture frames from the RTSP stream and store in a shared array
capture_process = mp.Process(target=fetch_frames, args=(shared_arr,
shared_frame_time, frame_lock, frame_ready, frame_shape, RTSP_URL))
capture_process.daemon = True
# for each region, start a separate process for motion detection and object detection
detection_processes = []
motion_processes = []
for region in regions:
detection_process = mp.Process(target=detect_objects, args=(shared_arr,
object_queue,
shared_frame_time,
frame_lock, frame_ready,
region['motion_detected'],
frame_shape,
region['size'], region['x_offset'], region['y_offset'],
region['min_person_area'],
DEBUG))
detection_process.daemon = True
detection_processes.append(detection_process)
motion_process = mp.Process(target=detect_motion, args=(shared_arr,
shared_frame_time,
frame_lock, frame_ready,
region['motion_detected'],
motion_changed,
frame_shape,
region['size'], region['x_offset'], region['y_offset'],
region['min_object_size'], region['mask'],
DEBUG))
motion_process.daemon = True
motion_processes.append(motion_process)
# start a thread to store recent motion frames for processing
frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock,
recent_motion_frames, motion_changed, [region['motion_detected'] for region in regions])
frame_tracker.start()
# start a thread to store the highest scoring recent frame of each type:
#PERSON
best_person_frame = BestFrameOfType("person", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_person_frame.start()
#CAR
best_car_frame = BestFrameOfType("car", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_car_frame.start()
#TRUCK
best_truck_frame = BestFrameOfType("truck", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_truck_frame.start()
#MOTOTCYCLE
best_motorcycle_frame = BestFrameOfType("motorcycle", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_motorcycle_frame.start()
#BUS
best_bus_frame = BestFrameOfType("bus", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_bus_frame.start()
#BICYCLE
best_bicycle_frame = BestFrameOfType("bicycle", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_bicycle_frame.start()
#DOG
best_dog_frame = BestFrameOfType("dog", objects_parsed, recent_motion_frames, DETECTED_OBJECTS, motion_changed, [region['motion_detected'] for region in regions])
best_dog_frame.start()
# start a thread to parse objects from the queue
object_parser = ObjectParser(object_queue, objects_parsed, DETECTED_OBJECTS)
object_parser.start()
# start a thread to expire objects from the detected objects list
object_cleaner = ObjectCleaner(objects_parsed, DETECTED_OBJECTS)
object_cleaner.start()
# connect to mqtt and setup last will
def on_connect(client, userdata, flags, rc):
print("On connect called")
# publish a message to signal that the service is running
client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
client = mqtt.Client()
client.on_connect = on_connect
client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
if not MQTT_USER is None:
client.username_pw_set(MQTT_USER, password=MQTT_PASS)
client.connect(MQTT_HOST, 1883, 60)
client.loop_start()
# start a thread to publish object scores
mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, DETECTED_OBJECTS)
mqtt_publisher.start()
# start thread to publish motion status
mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed, [region['motion_detected'] for region in regions])
mqtt_motion_publisher.start()
# start the process of capturing frames
capture_process.start()
print("capture_process pid ", capture_process.pid)
# start the object detection processes
for detection_process in detection_processes:
detection_process.start()
print("detection_process pid ", detection_process.pid)
# start the motion detection processes
for motion_process in motion_processes:
motion_process.start()
print("motion_process pid ", motion_process.pid)
# create a flask app that encodes frames a mjpeg on demand
app = Flask(__name__)
#PERSON
@app.route('/best_person.jpg')
def best_person():
frame = np.zeros(frame_shape, np.uint8) if best_person_frame.best_frame is None else best_person_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
#CAR
@app.route('/best_car.jpg')
def best_car():
frame = np.zeros(frame_shape, np.uint8) if best_car_frame.best_frame is None else best_car_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
#TRUCK
@app.route('/best_truck.jpg')
def best_truck():
frame = np.zeros(frame_shape, np.uint8) if best_truck_frame.best_frame is None else best_truck_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
#MOTORCYCLE
@app.route('/best_motorcycle.jpg')
def best_motorcycle():
frame = np.zeros(frame_shape, np.uint8) if best_motorcycle_frame.best_frame is None else best_motorcycle_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
#BUS
@app.route('/best_bus.jpg')
def best_bus():
frame = np.zeros(frame_shape, np.uint8) if best_bus_frame.best_frame is None else best_bus_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
#BICYCLE
@app.route('/best_bicycle.jpg')
def best_bicycle():
frame = np.zeros(frame_shape, np.uint8) if best_bicycle_frame.best_frame is None else best_bicycle_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
#DOG
@app.route('/best_dog.jpg')
def best_dog():
frame = np.zeros(frame_shape, np.uint8) if best_dog_frame.best_frame is None else best_dog_frame.best_frame
ret, jpg = cv2.imencode('.jpg', frame)
response = make_response(jpg.tobytes())
response.headers['Content-Type'] = 'image/jpg'
return response
@app.route('/')
def index():
# return a multipart response
return Response(imagestream(), mimetype='multipart/x-mixed-replace; boundary=frame')
def imagestream():
while True:
# max out at 5 FPS
time.sleep(0.2)
# make a copy of the current detected objects
detected_objects = DETECTED_OBJECTS.copy()
# lock and make a copy of the current frame
with frame_lock:
frame = frame_arr.copy()
# convert to RGB for drawing
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# draw the bounding boxes on the screen
for obj in detected_objects:
vis_util.draw_bounding_box_on_image_array(frame,
obj['ymin'],
obj['xmin'],
obj['ymax'],
obj['xmax'],
color='red',
thickness=2,
display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
use_normalized_coordinates=False)
for region in regions:
color = (255,255,255)
if region['motion_detected'].is_set():
color = (0,255,0)
cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
(region['x_offset']+region['size'], region['y_offset']+region['size']),
color, 2)
# convert back to BGR
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# encode the image into a jpg
ret, jpg = cv2.imencode('.jpg', frame)
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
app.run(host='0.0.0.0', debug=False)
capture_process.join()
for detection_process in detection_processes:
detection_process.join()
for motion_process in motion_processes:
motion_process.join()
frame_tracker.join()
best_person_frame.join()
best_car_frame.join()
best_truck_frame.join()
best_motorcycle_frame.join()
best_bus_frame.join()
best_bicycle_frame.join()
best_dog_frame.join()
object_parser.join()
object_cleaner.join()
mqtt_publisher.join()
if __name__ == '__main__':
main()