-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontrol.py
181 lines (150 loc) · 7.82 KB
/
control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import torch
from collections import namedtuple
from torch.multiprocessing import Event
from utils.class_utils import load_class
class TerminationFlag:
"""An enum class for representing various termination states."""
UNSET = -1
TARGET_NUM_VIDEOS_REACHED = 0
FILENAME_QUEUE_FULL = 1
FRAME_QUEUE_FULL = 2
class SharedQueues:
"""Manages intermediate queues that connect steps in the benchmark.
Args:
queue_class: The class to use when instantiating queues
(e.g., multiprocessing.Queue, torch.multiprocessing.Queue)
queue_size: Maximum number of items each queue can hold
pipeline: The whole pipeline info parsed from the input configuration file
per_gpu_queue: True if GPU-local queues should be used,
or False if global queues should be used
"""
def __init__(self, queue_class, queue_size, pipeline, per_gpu_queue):
self.per_gpu_queue = per_gpu_queue
self.filename_queue = queue_class(queue_size)
self.num_steps = len(pipeline)
# self.tensor_queues is a list of dictionaries, e.g.,
# [
# {0: filename_queue, 1: filename_queue}, (queue between client and step0)
# {0: q_step01_gpu0, 1: q_step01_gpu1}, (set of queues between step 0 & 1)
# {0: q_step12_gpu0, 1: q_step12_gpu1}, (set of queues between step 1 & 2)
# ...
# {0: None, 1: None} (the last step does not need an output queue)
# ]
# in case we use global queues, each dictionary will hold only one queue:
# [
# {0: filename_queue}, (queue between client and step 0)
# {0: q_step01}, (global queue between step 0 & 1)
# {0: q_step12}, (global queue between step 1 & 2)
# {0: None} (the last step does not need an output queue)
# ]
if per_gpu_queue:
# we assume that all steps use the same set of gpus
gpus = set(pipeline[0]['gpus'])
self.tensor_queues = [{gpu: queue_class(queue_size) for gpu in gpus}
for step_idx in range(self.num_steps - 1)]
# The first step will receive filenames from client via filename_queue.
# Unlike tensor queues, filename_queue is always a global queue so we
# insert the same entry (self.filename_queue) for all gpu indices.
self.tensor_queues.insert(0, {gpu: self.filename_queue for gpu in gpus})
# The last step does need an output queue, so we pass None.
self.tensor_queues.append({gpu: None for gpu in gpus})
else:
# There is no need for differentiating queues according to gpu index,
# since there is only one global queue (per step) anyway.
# Thus, instead of using gpu index as the dictionary key, we simply set
# the number 0 as the only key.
# Note that we could even just abandon the dictionary type and do
# something like [queue_class(queue_size) for _ in range(...)],
# but we keep the dictionary type to simplify the logic of later
# choosing prev_queue and next_queue in get_tensor_queue().
self.tensor_queues = [{0: queue_class(queue_size)}
for step_idx in range(self.num_steps - 1)]
# The first step will receive filenames from client via filename_queue.
self.tensor_queues.insert(0, {0: self.filename_queue})
# The last step does need an output queue, so we pass None.
self.tensor_queues.append({0: None})
def get_tensor_queue(self, step_idx, gpu_idx):
queue_idx = gpu_idx if self.per_gpu_queue else 0
prev_queue = self.tensor_queues[step_idx][queue_idx]
next_queue = self.tensor_queues[step_idx + 1][queue_idx]
return prev_queue, next_queue
def get_filename_queue(self):
return self.filename_queue
class TensorEvent:
"""Basically a tuple of several torch.Tensors and a multiprocessing.Event.
The Tensors can be used as "shared tensors" for passing intermediate tensors
across processes.
The Event should be used to signal that the consumer process has finished
reading from the Tensors. When writing values to Tensors, the producer
process should first check if Tensors are free, by calling event.wait(). If
the Tensors are indeed free, then event.wait() will return at once. If not,
then event.wait() will block until the consumer process calls event.set().
Thus, the consumer should make sure that it calls event.set() AFTER the
Tensors' contents have been copied to a safe area, such as the consumer's own
local tensors.
"""
def __init__(self, shapes, device, dtype=torch.float32):
self.tensors = tuple(torch.empty(*shape, dtype=dtype, device=device)
for shape in shapes)
self.event = Event()
self.event.set()
class SharedTensors:
"""Manages intermediate tensors that are passed across steps in the benchmark.
Args:
pipeline: The whole pipeline info parsed from the input configuration file
num_tensors_per_process: The number of shared output tensors that are given
to each process, for writing tensor values. A big value allows
processes to produce many tensors before having to block, but requires
a lot of GPU memory. A small value saves memory, but results in early
blocking. Note that if a step outputs several tensors during each
iteration, then this class allocates separate memory for each tensor,
but still treats them as one tensor when comparing the count with
num_tensors_per_process.
"""
def __init__(self, pipeline, num_tensors_per_process):
# self.tensors is a 3-level list of TensorEvents, e.g.,
# [
# None, (the first step does not need shared input tensors)
# [ (shared tensors between step 0 & 1)
# [tensorEvent000, tensorEvent001, ...] (outputs of process 0 in step 0)
# [tensorEvent010, tensorEvent011, ...] (outputs of process 1 in step 0)
# [tensorEvent020, tensorEvent021, ...] (outputs of process 2 in step 0)
# ],
# [ (shared tensors between step 1 & 2)
# [tensorEvent100, tensorEvent101, ...] (outputs of process 0 in step 1)
# [tensorEvent110, tensorEvent111, ...] (outputs of process 1 in step 1)
# [tensorEvent120, tensorEvent121, ...] (outputs of process 2 in step 1)
# ],
# ...,
# [None, None, ...] (the last step does not need shared output tensors)
# ]
self.tensors = [None]
# we exclude the last step since the last step does not need output tensors
for step in pipeline[:-1]:
# load the model class to check the output tensor shape of this step
model_module_path = step['model']
model_class = load_class(model_module_path)
shapes = model_class.output_shape()
if shapes is None:
# this step does not need shared output tensors
step_output_tensors = [None for _ in step(['gpus'])]
else:
step_output_tensors = []
for gpu in step['gpus']:
device = torch.device('cuda:%d' % gpu)
tensors = [TensorEvent(shapes, device)
for _ in range(num_tensors_per_process)]
step_output_tensors.append(tensors)
self.tensors.append(step_output_tensors)
# add Nones as output placeholders for the last step
self.tensors.append([None for _ in pipeline[-1]['gpus']])
def get_tensors(self, step_idx, instance_idx):
"""Returns the shared input tensors and output tensors for a given process.
The shared input tensors are returned as a 2-level list, containing the
output tensors of all processes of the previous step. On the other hand,
the output tensors are returned as a 1-level list, since this process does
not need to access the output tensors of other processes from the same step.
"""
return self.tensors[step_idx], self.tensors[step_idx + 1][instance_idx]
# An integer tuple for accessing tensors from SharedTensors.
Signal = namedtuple('Signal', ['instance_idx', 'tensor_idx'])