1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- # pylint: disable=no-name-in-module
16-
17- from confluent_kafka import Consumer , Producer
15+ # pylint: disable=no-name-in-module,import-outside-toplevel
1816
1917from opentelemetry .instrumentation .confluent_kafka import (
18+ AutoInstrumentedConsumer ,
19+ AutoInstrumentedProducer ,
2020 ConfluentKafkaInstrumentor ,
2121 ProxiedConsumer ,
2222 ProxiedProducer ,
4141
4242class TestConfluentKafka (TestBase ):
4343 def test_instrument_api (self ) -> None :
44+ from confluent_kafka import Consumer , Producer # noqa: PLC0415
45+
4446 instrumentation = ConfluentKafkaInstrumentor ()
4547
4648 producer = Producer ({"bootstrap.servers" : "localhost:29092" })
@@ -51,16 +53,22 @@ def test_instrument_api(self) -> None:
5153 producer = instrumentation .uninstrument_producer (producer )
5254 self .assertEqual (producer .__class__ , Producer )
5355
54- producer = Producer ({"bootstrap.servers" : "localhost:29092" })
55- producer = instrumentation .instrument_producer (producer )
56+ consumer = Consumer (
57+ {
58+ "bootstrap.servers" : "localhost:29092" ,
59+ "group.id" : "mygroup" ,
60+ "auto.offset.reset" : "earliest" ,
61+ }
62+ )
5663
57- self .assertEqual (producer .__class__ , ProxiedProducer )
64+ consumer = instrumentation .instrument_consumer (consumer )
65+ self .assertEqual (consumer .__class__ , ProxiedConsumer )
5866
59- producer = instrumentation .uninstrument_producer ( producer )
60- self .assertEqual (producer .__class__ , Producer )
67+ consumer = instrumentation .uninstrument_consumer ( consumer )
68+ self .assertEqual (consumer .__class__ , Consumer )
6169
6270 consumer = Consumer (
63- {
71+ ** {
6472 "bootstrap.servers" : "localhost:29092" ,
6573 "group.id" : "mygroup" ,
6674 "auto.offset.reset" : "earliest" ,
@@ -73,7 +81,37 @@ def test_instrument_api(self) -> None:
7381 consumer = instrumentation .uninstrument_consumer (consumer )
7482 self .assertEqual (consumer .__class__ , Consumer )
7583
84+ def test_instrument_api_with_instrument (self ) -> None :
85+ ConfluentKafkaInstrumentor ().instrument ()
86+
87+ from confluent_kafka import Consumer , Producer # noqa: PLC0415
88+
89+ producer = Producer ({"bootstrap.servers" : "localhost:29092" })
90+ self .assertEqual (producer .__class__ , AutoInstrumentedProducer )
91+
92+ consumer = Consumer (
93+ {
94+ "bootstrap.servers" : "localhost:29092" ,
95+ "group.id" : "mygroup" ,
96+ "auto.offset.reset" : "earliest" ,
97+ }
98+ )
99+ self .assertEqual (consumer .__class__ , AutoInstrumentedConsumer )
100+
101+ consumer = Consumer (
102+ ** {
103+ "bootstrap.servers" : "localhost:29092" ,
104+ "group.id" : "mygroup" ,
105+ "auto.offset.reset" : "earliest" ,
106+ }
107+ )
108+ self .assertEqual (consumer .__class__ , AutoInstrumentedConsumer )
109+
110+ ConfluentKafkaInstrumentor ().uninstrument ()
111+
76112 def test_consumer_commit_method_exists (self ) -> None :
113+ from confluent_kafka import Consumer # noqa: PLC0415
114+
77115 instrumentation = ConfluentKafkaInstrumentor ()
78116
79117 consumer = Consumer (
0 commit comments