@@ -33,18 +33,18 @@ def __init__(self, cb, filename):
33
33
34
34
def run (self ):
35
35
while not self ._fp .closed :
36
- l = self ._fp .readline ()
37
- if l == "" :
36
+ line = self ._fp .readline ()
37
+ if line == "" :
38
38
# wait for new events
39
39
sleep (1 )
40
40
continue
41
41
42
42
try :
43
- msg = json .loads (l )
43
+ msg = json .loads (line )
44
44
routing_key = msg .get ("type" )
45
45
# log.debug("Received message with routing key %s" % routing_key)
46
- registry .eval_callback (routing_key , l , self ._cb )
47
- except :
46
+ registry .eval_callback (routing_key , line , self ._cb )
47
+ except Exception :
48
48
pass
49
49
50
50
def stop (self ):
@@ -76,8 +76,8 @@ def __init__(self, cb, listening_address, **kwargs):
76
76
certfile = kwargs .get ("certfile" , None )
77
77
if not keyfile or not certfile :
78
78
raise CredentialError ("Need to specify 'keyfile' and 'certfile' for HTTPS" )
79
- self .httpd .socket = ssl .wrap_socket (self .httpd .socket , certfile = certfile , keyfile = keyfile ,
80
- server_side = True )
79
+ self .httpd .socket = ssl .wrap_socket (self .httpd .socket , certfile = certfile , keyfile = keyfile ,
80
+ server_side = True )
81
81
82
82
def run (self ):
83
83
self .httpd .serve_forever ()
@@ -168,7 +168,7 @@ def on_connection_closed(self, connection, reply_code, reply_text):
168
168
self ._connection .ioloop .stop ()
169
169
else :
170
170
log .warning ('Connection closed, reopening in 5 seconds: (%s) %s' ,
171
- reply_code , reply_text )
171
+ reply_code , reply_text )
172
172
self ._connection .add_timeout (5 , self .reconnect )
173
173
174
174
def reconnect (self ):
@@ -231,7 +231,7 @@ def on_channel_closed(self, channel, reply_code, reply_text):
231
231
232
232
"""
233
233
log .warning ('Channel %i was closed: (%s) %s' ,
234
- channel , reply_code , reply_text )
234
+ channel , reply_code , reply_text )
235
235
self ._connection .close ()
236
236
237
237
def setup_exchange (self , exchange_name ):
@@ -282,7 +282,7 @@ def on_queue_declareok(self, method_frame):
282
282
"""
283
283
for routing_key in self .ROUTING_KEYS :
284
284
log .debug ('Binding %s to %s with %s' ,
285
- self .EXCHANGE , self .QUEUE , routing_key )
285
+ self .EXCHANGE , self .QUEUE , routing_key )
286
286
self ._channel .queue_bind (self .on_bindok , self .QUEUE ,
287
287
self .EXCHANGE , routing_key )
288
288
@@ -331,7 +331,7 @@ def on_consumer_cancelled(self, method_frame):
331
331
332
332
"""
333
333
log .debug ('Consumer was cancelled remotely, shutting down: %r' ,
334
- method_frame )
334
+ method_frame )
335
335
if self ._channel :
336
336
self ._channel .close ()
337
337
@@ -419,7 +419,7 @@ def on_message(self, unused_channel, basic_deliver, properties, body):
419
419
420
420
"""
421
421
log .debug ('Received message # %s with properties %s' ,
422
- basic_deliver .delivery_tag , properties )
422
+ basic_deliver .delivery_tag , properties )
423
423
424
424
registry .eval_callback (basic_deliver .routing_key , body , self ._cb )
425
425
0 commit comments