Skip to content

Commit

Permalink
Renamed certain internal variables to make usage more obvious
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Dec 28, 2018
1 parent 7563b78 commit 9af934d
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 25 deletions.
17 changes: 10 additions & 7 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ class Channel(BaseChannel):
'_connection', '_exchange', '_inbound', '_queue', '_tx'
]

def __init__(self, channel_id, connection, rpc_timeout, on_close=None):
def __init__(self, channel_id, connection, rpc_timeout,
on_close_impl=None):
super(Channel, self).__init__(channel_id)
self.rpc = Rpc(self, timeout=rpc_timeout)
self._consumer_callbacks = {}
self._confirming_deliveries = False
self._connection = connection
self._on_close = on_close
self._on_close_impl = on_close_impl
self._inbound = []
self._basic = Basic(self, connection.max_frame_size)
self._exchange = Exchange(self)
Expand Down Expand Up @@ -160,14 +161,14 @@ def close(self, reply_code=200, reply_text=''):
self.rpc_request(specification.Channel.Close(
reply_code=reply_code,
reply_text=reply_text),
adapter=self._connection
connection_adapter=self._connection
)
finally:
if self._inbound:
del self._inbound[:]
self.set_state(self.CLOSED)
if self._on_close:
self._on_close(self.channel_id)
if self._on_close_impl:
self._on_close_impl(self.channel_id)
LOGGER.debug('Channel #%d Closed', self.channel_id)

def check_for_errors(self):
Expand Down Expand Up @@ -280,7 +281,7 @@ def process_data_events(self, to_tuple=False, auto_decode=True):
# noinspection PyCallingNonCallable
self._consumer_callbacks[consumer_tag](message)

def rpc_request(self, frame_out, adapter=None):
def rpc_request(self, frame_out, connection_adapter=None):
"""Perform a RPC Request.
:param specification.Frame frame_out: Amqp frame.
Expand All @@ -289,7 +290,9 @@ def rpc_request(self, frame_out, adapter=None):
with self.rpc.lock:
uuid = self.rpc.register_request(frame_out.valid_responses)
self._connection.write_frame(self.channel_id, frame_out)
return self.rpc.get_request(uuid, adapter=adapter)
return self.rpc.get_request(
uuid, connection_adapter=connection_adapter
)

def start_consuming(self, to_tuple=False, auto_decode=True):
"""Start consuming messages.
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
}
self._validate_parameters()
self._io = IO(self.parameters, exceptions=self._exceptions,
on_read=self._read_buffer)
on_read_impl=self._read_buffer)
self._channel0 = Channel0(self)
self._channels = {}
self._last_channel_id = None
Expand Down Expand Up @@ -158,7 +158,7 @@ def channel(self, rpc_timeout=60, lazy=False):
with self.lock:
channel_id = self._get_next_available_channel_id()
channel = Channel(channel_id, self, rpc_timeout,
on_close=self._cleanup_channel)
on_close_impl=self._cleanup_channel)
self._channels[channel_id] = channel
if not lazy:
channel.open()
Expand Down
6 changes: 3 additions & 3 deletions amqpstorm/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
class Heartbeat(object):
"""Internal Heartbeat handler."""

def __init__(self, interval, send_heartbeat, timer=threading.Timer):
self.send_heartbeat = send_heartbeat
def __init__(self, interval, send_heartbeat_impl, timer=threading.Timer):
self.send_heartbeat_impl = send_heartbeat_impl
self.timer_impl = timer
self._lock = threading.Lock()
self._running = threading.Event()
Expand Down Expand Up @@ -80,7 +80,7 @@ def _check_for_life_signs(self):
if not self._running.is_set():
return False
if self._writes_since_check == 0:
self.send_heartbeat()
self.send_heartbeat_impl()
self._lock.acquire()
try:
if self._reads_since_check == 0:
Expand Down
6 changes: 3 additions & 3 deletions amqpstorm/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def is_ready(self):
class IO(object):
"""Internal Input/Output handler."""

def __init__(self, parameters, exceptions=None, on_read=None):
def __init__(self, parameters, exceptions=None, on_read_impl=None):
self._exceptions = exceptions
self._lock = threading.Lock()
self._inbound_thread = None
self._on_read = on_read
self._on_read_impl = on_read_impl
self._running = threading.Event()
self._parameters = parameters
self.data_in = EMPTY_BUFFER
Expand Down Expand Up @@ -222,7 +222,7 @@ def _process_incoming_data(self):
while self._running.is_set():
if self.poller.is_ready:
self.data_in += self._receive()
self.data_in = self._on_read(self.data_in)
self.data_in = self._on_read_impl(self.data_in)

def _receive(self):
"""Receive any incoming socket data.
Expand Down
17 changes: 10 additions & 7 deletions amqpstorm/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, default_adapter, timeout=360):
:param int|float timeout: Rpc timeout.
"""
self._lock = threading.Lock()
self._default_adapter = default_adapter
self._default_connection_adapter = default_adapter
self._timeout = timeout
self._response = {}
self._request = {}
Expand Down Expand Up @@ -83,19 +83,22 @@ def remove_response(self, uuid):
if uuid in self._response:
del self._response[uuid]

def get_request(self, uuid, raw=False, multiple=False, adapter=None):
def get_request(self, uuid, raw=False, multiple=False,
connection_adapter=None):
"""Get a RPC request.
:param str uuid: Rpc Identifier
:param bool raw: If enabled return the frame as is, else return
result as a dictionary.
:param bool multiple: Are we expecting multiple frames.
:param obj adapter: Provide custom adapter.
:param obj connection_adapter: Provide custom connection adapter.
:return:
"""
if uuid not in self._response:
return
self._wait_for_request(uuid, adapter or self._default_adapter)
self._wait_for_request(
uuid, connection_adapter or self._default_connection_adapter
)
frame = self._get_response_frame(uuid)
if not multiple:
self.remove(uuid)
Expand All @@ -118,16 +121,16 @@ def _get_response_frame(self, uuid):
frame = frames.pop(0)
return frame

def _wait_for_request(self, uuid, adapter=None):
def _wait_for_request(self, uuid, connection_adapter=None):
"""Wait for RPC request to arrive.
:param str uuid: Rpc Identifier.
:param obj adapter: Provide custom adapter.
:param obj connection_adapter: Provide custom connection adapter.
:return:
"""
start_time = time.time()
while not self._response[uuid]:
adapter.check_for_errors()
connection_adapter.check_for_errors()
if time.time() - start_time > self._timeout:
self._raise_rpc_timeout_error(uuid)
time.sleep(IDLE_WAIT)
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/tests/unit/heartbeat_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_heartbeat_send_heartbeat(self):
def send_heartbeat():
self.beats += 1

heartbeat = Heartbeat(60, send_heartbeat=send_heartbeat)
heartbeat = Heartbeat(60, send_heartbeat_impl=send_heartbeat)
heartbeat._running.set()

for _ in range(8):
Expand Down Expand Up @@ -192,7 +192,7 @@ def test_heartbeat_extended_loop(self):
def send_heartbeat():
self.beats += 1

heartbeat = Heartbeat(60, send_heartbeat=send_heartbeat)
heartbeat = Heartbeat(60, send_heartbeat_impl=send_heartbeat)
heartbeat._running.set()
heartbeat.register_read()

Expand Down
4 changes: 3 additions & 1 deletion amqpstorm/tests/unit/rpc_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ def delivery_payload():
thread = threading.Thread(target=delivery_payload)
thread.start()

rpc._wait_for_request(uuid, adapter=rpc._default_adapter)
rpc._wait_for_request(
uuid, connection_adapter=rpc._default_connection_adapter
)

def test_with_for_request_with_custom_adapter(self):
class Adapter(object):
Expand Down

0 comments on commit 9af934d

Please sign in to comment.