Open
Description
It's a little unclear how basic_qos should be used. I currently have the following code:
def connect_and_receive(sub_to_process_queue, amqp_user, amqp_password, amqp_host, amqp_port, amqp_exchange_sub, amqp_queue_sub):
# declare send and receive clients, both connecting to the same server on local machine
subscribe_dsn = "amqp://{}:{}@{}:{}/".format(
amqp_user, amqp_password, amqp_host, amqp_port
)
print("Subscribing to server:", subscribe_dsn)
consumer = puka.Client(subscribe_dsn)
# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)
# Define receiving exchange and queue
receive_promise = consumer.exchange_declare(exchange=amqp_exchange_sub, type='fanout', durable=True)
consumer.wait(receive_promise)
receive_promise = consumer.queue_declare(queue=amqp_queue_sub, durable=True, exclusive=False, auto_delete=False)
consumer.wait(receive_promise)
receive_promise = consumer.queue_bind(exchange=amqp_exchange_sub, queue=amqp_queue_sub)
consumer.wait(receive_promise)
# start waiting for messages
receive_promise = consumer.basic_consume(queue=amqp_queue_sub, no_ack=False)
consumer.basic_qos(receive_promise, prefetch_count=10)
while True:
message = consumer.wait(receive_promise)
# print("Resc", end=" ")
handle_message(message, sub_to_process_queue)
consumer.basic_ack(message)
But when I run it I get an Exception:
transform_1 | Exception in thread Thread-1:
transform_1 | Traceback (most recent call last):
transform_1 | File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
transform_1 | self.run()
transform_1 | File "/usr/local/lib/python3.8/threading.py", line 870, in run
transform_1 | self._target(*self._args, **self._kwargs)
transform_1 | File "/app/subscribe.py", line 29, in connect_and_receive
transform_1 | message = consumer.wait(receive_promise)
transform_1 | File "/app/src/puka/puka/connection.py", line 377, in wait
transform_1 | self.on_read()
transform_1 | File "/app/src/puka/puka/connection.py", line 191, in on_read_nohandshake
transform_1 | self._handle_read(data, offset)
transform_1 | File "/app/src/puka/puka/connection.py", line 224, in _handle_frame_read
transform_1 | self.channels.channels[channel].inbound_method(frame)
transform_1 | File "/app/src/puka/puka/channel.py", line 80, in inbound_method
transform_1 | self._handle_inbound(frame)
transform_1 | File "/app/src/puka/puka/channel.py", line 106, in _handle_inbound
transform_1 | self.promise.recv_method(result)
transform_1 | File "/app/src/puka/puka/promise.py", line 87, in recv_method
transform_1 | callback = self.methods[result.method_id]
transform_1 | KeyError: 3932171
transform_1 |
transform_1 | Thread stopped
How exactly should I be using the basic_qos function?
Metadata
Metadata
Assignees
Labels
No labels