@@ -47,6 +47,7 @@ class PowerScheduleReasons:
47
47
DISABLED = 'Power schedule is disabled'
48
48
OUTDATED = 'Power schedule is outdated'
49
49
NO_CHANGES = 'Changing state is not required'
50
+ CONFLICT = 'Conflicting triggers'
50
51
51
52
52
53
class PowerScheduleWorker (ConsumerMixin ):
@@ -123,61 +124,72 @@ def _intersect(segment_1, segment_2):
123
124
first_point = max (a [0 ])
124
125
last_point = min (a [1 ])
125
126
if first_point < last_point :
126
- result = [ first_point , last_point ]
127
+ result = ( first_point , last_point )
127
128
elif first_point == last_point :
128
- result = [ first_point ]
129
+ result = ( first_point ,)
129
130
else :
130
- result = []
131
+ result = ()
131
132
return result
132
133
133
134
def get_action (self , schedule ):
134
135
local_tz = pytz .timezone (schedule ['timezone' ])
135
136
now_dt = datetime .now (tz = pytz .utc )
136
137
last_eval_dt = datetime .fromtimestamp (
137
138
schedule ['last_eval' ], tz = pytz .utc )
138
- power_off_today = self ._local_time_to_utc (
139
- schedule ['power_off' ], local_tz )
140
- power_on_today = self ._local_time_to_utc (
141
- schedule ['power_on' ], local_tz )
142
- if power_off_today == power_on_today :
143
- self .result ['reason' ] = PowerScheduleReasons .NO_CHANGES
144
- return None
145
- power_off_tomorrow = power_off_today + timedelta (hours = 24 )
146
- power_on_tomorrow = power_on_today + timedelta (hours = 24 )
147
- power_off_segment = [power_off_today , power_off_tomorrow ]
148
- power_on_segment = [power_on_today , power_on_tomorrow ]
149
- run_dt = [last_eval_dt , now_dt ]
150
-
151
- action = None
152
- schedule_time = None
153
- # list of state changes already come
154
- action_time = [x for x in power_off_segment + power_on_segment
155
- if x < now_dt ]
156
- if action_time :
157
- # get nearest past action time
158
- last_action_time = min (action_time , key = lambda x : abs (x - now_dt ))
159
- if last_action_time in power_on_segment :
160
- action = 'start_instance'
161
- schedule_time = power_on_segment
162
- elif last_action_time in power_off_segment :
163
- action = 'stop_instance'
164
- schedule_time = power_off_segment
165
- else :
166
- # no action changes required yet
167
- self .result ['reason' ] = PowerScheduleReasons .NO_CHANGES
168
- return None
169
-
170
- cross = None
171
- # intersect power on or power off schedule with check times to check if
172
- # the last action has already been applied
173
- if schedule_time :
174
- cross = self ._intersect (schedule_time , run_dt )
175
- if cross == run_dt or cross is None :
139
+ times_today = []
140
+ time_action_map = {}
141
+ for trigger in schedule ['triggers' ]:
142
+ action = trigger ['action' ]
143
+ time = self ._local_time_to_utc (trigger ['time' ], local_tz )
144
+ if time in time_action_map :
145
+ raise PowerScheduleException (
146
+ 'Conflicting triggers for time: {}' .format (
147
+ trigger ['time' ]))
148
+ times_today .append (time )
149
+ time_action_map [time ] = action
150
+ # collect power on/off segments during day
151
+ times_today = sorted (times_today )
152
+ time_periods = zip (times_today [:- 1 ], times_today [1 :])
153
+
154
+ run_dt = (last_eval_dt , now_dt )
155
+ if len (times_today ) == 1 :
156
+ time = times_today [0 ]
157
+ if last_eval_dt <= time <= now_dt :
158
+ action = time_action_map [times_today [0 ]]
159
+ LOG .info ('Action required: %s' , action )
160
+ return action
161
+
162
+ candidate = None
163
+ for period in time_periods :
164
+ cross = self ._intersect (period , run_dt )
165
+ if cross and cross == run_dt :
166
+ # trigger's time hasn't come
167
+ # (period_start <= last_eval <= now <= period_end)
168
+ self .result ['reason' ] = PowerScheduleReasons .NO_CHANGES
169
+ return None
170
+ elif cross and cross != run_dt and cross != period :
171
+ # found nearest trigger in the past
172
+ # (last_eval <= period_start <= now <= period_end) OR
173
+ # (period_start <= last_eval <= period_end <= now)
174
+ time = max ([x for x in period if x <= now_dt ])
175
+ action = time_action_map [time ]
176
+ LOG .info ('Action required: %s' , action )
177
+ return action
178
+ elif cross and cross == period :
179
+ # too much time passed between runs, continue iterating between
180
+ # periods to find the nearest trigger
181
+ # (last_eval <= period_start <= period_end <= now)
182
+ if not candidate :
183
+ candidate = period [1 ]
184
+ else :
185
+ candidate = max (candidate , period [1 ])
186
+ if not candidate :
187
+ # triggers' times hasn't come
176
188
self .result ['reason' ] = PowerScheduleReasons .NO_CHANGES
177
189
return None
178
- else :
179
- LOG .info ('Action required: %s' , action )
180
- return action
190
+ action = time_action_map [ candidate ]
191
+ LOG .info ('Action required: %s' , action )
192
+ return action
181
193
182
194
@staticmethod
183
195
def get_resource_data (resource , cloud_type ):
@@ -279,9 +291,14 @@ def process_schedule(self, power_schedule_id):
279
291
return
280
292
required_action = self .get_action (schedule )
281
293
if required_action :
282
- self .process_resources (schedule , required_action )
294
+ action_func_map = {
295
+ 'power_on' : 'start_instance' ,
296
+ 'power_off' : 'stop_instance' ,
297
+ }
298
+ self .process_resources (schedule , action_func_map [required_action ])
283
299
284
300
def process_task (self , body , message ):
301
+ now_ts = int (datetime .now (tz = pytz .utc ).timestamp ())
285
302
self .result = self .default_result ().copy ()
286
303
error = None
287
304
power_schedule_id = body .get ('power_schedule_id' )
@@ -296,7 +313,6 @@ def process_task(self, body, message):
296
313
LOG .error ('Task failed: %s' , error )
297
314
LOG .info ('Power schedule %s results:\n %s' ,
298
315
power_schedule_id , self .result )
299
- now_ts = int (datetime .now (tz = pytz .utc ).timestamp ())
300
316
updates = {
301
317
'last_eval' : now_ts ,
302
318
}
0 commit comments