-
Notifications
You must be signed in to change notification settings - Fork 0
/
ev.pyx
153 lines (115 loc) · 4.18 KB
/
ev.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
cimport cpython
cimport libev
cdef void _ev_callback(libev.ev_loop_t *loop,
libev.ev_watcher *watcher,
int revents) except *:
cdef Watcher w = <Watcher> watcher.data
try:
if w._ccb != NULL:
w._ccb(w._cpriv, w, revents)
elif w._cb is not None:
w._cb(w, revents)
else:
w.event_handler(revents)
except:
libev.ev_unloop(loop, libev.EVUNLOOP_ONE)
raise
class Error(Exception):
pass
cdef class Watcher:
def __cinit__(self, *args, **kwargs):
libev.ev_init(&self._w.watcher, _ev_callback)
self._w.watcher.data = <void *> self
self._ccb = NULL
self._cpriv = NULL
cpdef set_callback(self, cb):
self._cb = cb
cdef set_ccallback(self, watcher_cb ccb, void *_cpriv):
self._ccb = ccb
self._cpriv = _cpriv
cdef event_handler(self, int revents):
raise NotImplementedError
cpdef bint is_active(self):
return libev.ev_is_active(&self._w.watcher)
cpdef bint is_pending(self):
return libev.ev_is_pending(&self._w.watcher)
cdef class IO(Watcher):
def __init__(self, fp, int events=EV_READ, cb=None):
fd = cpython.PyObject_AsFileDescriptor(fp)
libev.ev_io_set(&self._w.io, fd, events)
self._cb = cb
def __dealloc__(self):
self.stop()
cpdef set(self, fp, int events=EV_READ):
fd = cpython.PyObject_AsFileDescriptor(fp)
if libev.ev_is_active(<libev.ev_watcher *>&self._w.io):
self.stop()
libev.ev_io_set(&self._w.io, fd, events)
self.start()
else:
libev.ev_io_set(&self._w.io, fd, events)
cpdef int fileno(self):
return self._w.io.fd
cpdef start(self):
libev.ev_io_start(libev.EV_DEFAULT, &self._w.io)
cpdef stop(self):
libev.ev_io_stop(libev.EV_DEFAULT, &self._w.io)
cdef class Timer(Watcher):
def __init__(self, float timeout=0, float periodic=0, cb=None):
libev.ev_timer_set(&self._w.timer, timeout, periodic)
self._cb = cb
def __dealloc__(self):
self.stop()
cpdef start(self):
libev.ev_timer_start(libev.EV_DEFAULT, &self._w.timer)
cpdef stop(self):
libev.ev_timer_stop(libev.EV_DEFAULT, &self._w.timer)
cpdef set_timeout(self, float timeout, float periodic=0):
if libev.ev_is_active(<libev.ev_watcher *>&self._w.io):
self.stop()
libev.ev_timer_set(&self._w.timer, timeout, periodic)
self.start()
else:
libev.ev_timer_set(&self._w.timer, timeout, periodic)
cpdef set_periodic(self, float timeout):
if libev.ev_is_active(<libev.ev_watcher *>&self._w.io):
self.stop()
libev.ev_timer_set(&self._w.timer, timeout, timeout)
self.start()
else:
libev.ev_timer_set(&self._w.timer, timeout, timeout)
cpdef set_oneshot(self, float timeout):
if libev.ev_is_active(<libev.ev_watcher *>&self._w.io):
self.stop()
libev.ev_timer_set(&self._w.timer, timeout, 0)
self.start()
else:
libev.ev_timer_set(&self._w.timer, timeout, 0)
cdef class Signal(Watcher):
def __init__(self, int signum, cb=None):
libev.ev_signal_set(&self._w.signal, signum)
self._cb = cb
def __dealloc__(self):
self.stop()
cpdef start(self):
libev.ev_signal_start(libev.EV_DEFAULT, &self._w.signal)
cpdef stop(self):
libev.ev_signal_stop(libev.EV_DEFAULT, &self._w.signal)
cpdef set(self, int signum):
if libev.ev_is_active(<libev.ev_watcher *>&self._w.io):
self.stop()
libev.ev_signal_set(&self._w.signal, signum)
self.start()
else:
libev.ev_signal_set(&self._w.signal, signum)
cpdef double get_clocks():
return libev.ev_time()
cpdef sleep(double delay):
libev.ev_sleep(delay)
cpdef main(bint once=False):
if once:
libev.ev_loop(libev.EV_DEFAULT, libev.EVLOOP_ONESHOT)
else:
libev.ev_loop(libev.EV_DEFAULT, libev.EVLOOP_NORMAL)
cpdef quit():
libev.ev_unloop(libev.EV_DEFAULT, libev.EVUNLOOP_ONE)