1111# #
1212#####################################################################
1313from __future__ import print_function , division , absolute_import , unicode_literals
14+
1415try :
1516 from time import monotonic
1617except ImportError :
1718 from time import time as monotonic
1819
1920from bisect import insort
21+ import copy
2022
2123# This module contains a minimalistic task queue. It is used by the zlock and zlog
2224# servers (zprocess.locking and zprocess.logging respectively) in order to wait for
4244class Task (object ):
4345 def __init__ (self , due_in , func , * args , ** kwargs ):
4446 """Wrapper for a function call to be executed after a specified time interval.
45- due_in is how long in the future, in seconds, the function should be called,
47+ ` due_in` is how long in the future, in seconds, the function should be called,
4648 func is the function to call. All subsequent arguments and keyword arguments
47- will be passed to the function."""
49+ will be passed to the function. If added to a TaskQueue with `repeat=True`, or
50+ if Task.repeat is set to True, then the `due_in` argument is used as the
51+ repetition interval."""
4852 self .due_at = monotonic () + due_in
53+ self .interval = due_in
4954 self .func = func
5055 self .args = args
5156 self .kwargs = kwargs
52- self .called = False
57+ self ._called = False
58+ self .repeat = False
5359
5460 def due_in (self ):
5561 """The time interval in seconds until the task is due"""
5662 return self .due_at - monotonic ()
5763
5864 def __call__ (self ):
59- if self .called :
65+ if self ._called :
6066 raise RuntimeError ('Task has already been called' )
61- self .called = True
67+ self ._called = True
6268 return self .func (* self .args , ** self .kwargs )
6369
6470 def __gt__ (self , other ):
@@ -71,13 +77,51 @@ class TaskQueue(list):
7177 """A list of pending tasks due at certain times. Tasks are stored with the soonest
7278 due at the end of the list, to be removed with pop()"""
7379
74- def add (self , task ):
75- """Insert the task into the queue, maintaining sort order"""
80+ def pop (self ):
81+ """Returns the next due task. If Task.repeat is True, re-adds a copy of the Task
82+ to the queue with a due time `Task.interval` after its previous due time."""
83+ task = super ().pop ()
84+ if task .repeat :
85+ next_rep = copy .copy (task )
86+ # If next due time would be in the past, add a task due now instead. We
87+ # don't want to a large number of tasks if e.g. system suspension led to a
88+ # discontinuous jump forward in the monotonic clock
89+ next_rep .due_at = max (monotonic (), task .due_at + task .interval )
90+ self .add (next_rep )
91+ return task
92+
93+ def add (self , task , repeat = None ):
94+ """Insert the task into the queue, maintaining sort order. If repeat is given,
95+ this sets `Task.repeat`, which controls whether or not a copy of the task will
96+ be re-added to the queue after it is removed via a call to `TaskQueue.pop()`."""
97+ if repeat is not None :
98+ task .repeat = repeat
7699 insort (self , task )
77100
78101 def next (self ):
79102 """Return the next due task, without removing it from the queue"""
80103 return self [- 1 ]
81104
82105 def cancel (self , task ):
83- self .remove (task )
106+ self .remove (task )
107+
108+
109+ if __name__ == '__main__' :
110+ # Intended use. It's up the caller to sleep() or otherwise wait until the next task
111+ # is due. It is assumed that the queue is only accessed from a single thread such
112+ # that it's not possible for any tasks to be added to the queue during such a sleep.
113+ # More usefully, instead of calling sleep(), an application might be calling
114+ # select.select() or select.poll() with the time to the next task as the timeout.
115+ # This way one can implement a simple mainloop for an event-driven application that
116+ # can respond to network and file events as well as executing time-based tasks. This
117+ # is how the mainloops of the zlock and zlog servers are implemented.
118+ import time
119+
120+ tasks = TaskQueue ()
121+ task = Task (1 , print , 'hello' , 'world' )
122+
123+ tasks .add (task , repeat = True )
124+ while tasks :
125+ time .sleep (max (0 , tasks .next ().due_in ())) # don't pass sleep() a negative value
126+ next_task = tasks .pop ()
127+ next_task ()
0 commit comments