-
Notifications
You must be signed in to change notification settings - Fork 227
Open
Labels
Description
Threads and sleeps involved what can go wrong :)
_____________ test_kafka_consumer_ignore_topic_ongoing_transaction _____________
instrument = None
elasticapm_client = <tests.fixtures.TempStoreClient object at 0x7ff1d0667890>
producer = <kafka.producer.kafka.KafkaProducer object at 0x7ff1d0ae16d0>
consumer = <kafka.consumer.group.KafkaConsumer object at 0x7ff1d0ae1b50>
topics = ['test', 'foo', 'bar']
def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar")
def delayed_send():
time.sleep(0.2)
producer.send(topic="foo", key=b"foo", value=b"bar")
producer.send("bar", key=b"foo", value=b"bar")
producer.send("test", key=b"foo", value=b"bar")
thread = threading.Thread(target=delayed_send)
thread.start()
transaction = elasticapm_client.begin_transaction("foo")
for item in consumer:
pass
thread.join()
elasticapm_client.end_transaction("foo")
transactions = elasticapm_client.events[TRANSACTION]
spans = elasticapm_client.spans_for_transaction(transactions[0])
> assert len(spans) == 1
E assert 0 == 1
E + where 0 = len([])
consumer = <kafka.consumer.group.KafkaConsumer object at 0x7ff1d0ae1b50>
delayed_send = <function test_kafka_consumer_ignore_topic_ongoing_transaction.<locals>.delayed_send at 0x7ff1d0adb380>
elasticapm_client = <tests.fixtures.TempStoreClient object at 0x7ff1d0667890>
instrument = None
producer = <kafka.producer.kafka.KafkaProducer object at 0x7ff1d0ae16d0>
spans = []
thread = <Thread(Thread-10 (delayed_send), stopped 140676526606016)>
topics = ['test', 'foo', 'bar']
transaction = <elasticapm.traces.Transaction object at 0x7ff1d0733890>
transactions = [{'context': {'tags': {}}, 'duration': 505.265, 'id': '77f0c018dadb7dd4', 'name': 'foo', ...}]