1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ from __future__ import annotations
16+
1517import base64
1618import threading
1719import time
1820from queue import Queue
19- from typing import Optional
21+ from typing import TYPE_CHECKING
2022
21- from opentelemetry import context as context_api
2223from opentelemetry ._logs .severity import SeverityNumber
2324from opentelemetry .exporter .otlp .proto .common .trace_encoder import encode_spans
2425from opentelemetry .proto .trace .v1 import trace_pb2
2526from opentelemetry .sdk ._logs import LogData , LogRecord
26- from opentelemetry .sdk ._logs .export import LogExporter
27- from opentelemetry .sdk .trace import (
28- SpanProcessor ,
29- Span ,
30- ReadableSpan
31- )
27+ from opentelemetry .sdk .trace import ReadableSpan , Span , SpanProcessor
3228from opentelemetry .trace import TraceFlags
3329
30+ if TYPE_CHECKING :
31+ from opentelemetry import context as context_api
32+ from opentelemetry .sdk ._logs .export import LogExporter
33+ from opentelemetry .sdk .resources import Resource
34+
3435WORKER_THREAD_NAME = "OtelPartialSpanProcessor"
3536
3637
@@ -39,12 +40,15 @@ class PartialSpanProcessor(SpanProcessor):
3940 def __init__ (
4041 self ,
4142 log_exporter : LogExporter ,
42- heartbeat_interval_millis : int
43- ):
43+ heartbeat_interval_millis : int ,
44+ resource : Resource | None = None ,
45+ ) -> None :
4446 if heartbeat_interval_millis <= 0 :
45- raise ValueError ("heartbeat_interval_ms must be greater than 0" )
47+ msg = "heartbeat_interval_ms must be greater than 0"
48+ raise ValueError (msg )
4649 self .log_exporter = log_exporter
4750 self .heartbeat_interval_millis = heartbeat_interval_millis
51+ self .resource = resource
4852
4953 self .active_spans = {}
5054 self .ended_spans = Queue ()
@@ -53,11 +57,11 @@ def __init__(
5357 self .done = False
5458 self .condition = threading .Condition (threading .Lock ())
5559 self .worker_thread = threading .Thread (
56- name = WORKER_THREAD_NAME , target = self .worker , daemon = True
60+ name = WORKER_THREAD_NAME , target = self .worker , daemon = True ,
5761 )
5862 self .worker_thread .start ()
5963
60- def worker (self ):
64+ def worker (self ) -> None :
6165 while not self .done :
6266 with self .condition :
6367 self .condition .wait (self .heartbeat_interval_millis / 1000 )
@@ -73,17 +77,17 @@ def worker(self):
7377
7478 self .heartbeat ()
7579
76- def heartbeat (self ):
80+ def heartbeat (self ) -> None :
7781 with self .lock :
78- for span_key , span in list (self .active_spans .items ()):
82+ for span in list (self .active_spans .values ()):
7983 attributes = self .get_heartbeat_attributes ()
80- log_data = get_log_data (span , attributes )
84+ log_data = self . get_log_data (span , attributes )
8185 self .log_exporter .export ([log_data ])
8286
83- def on_start (self , span : " Span" ,
84- parent_context : Optional [ context_api .Context ] = None ) -> None :
87+ def on_start (self , span : Span ,
88+ parent_context : context_api .Context | None = None ) -> None :
8589 attributes = self .get_heartbeat_attributes ()
86- log_data = get_log_data (span , attributes )
90+ log_data = self . get_log_data (span , attributes )
8791 self .log_exporter .export ([log_data ])
8892
8993 span_key = (span .context .trace_id , span .context .span_id )
@@ -92,7 +96,7 @@ def on_start(self, span: "Span",
9296
9397 def on_end (self , span : ReadableSpan ) -> None :
9498 attributes = get_stop_attributes ()
95- log_data = get_log_data (span , attributes )
99+ log_data = self . get_log_data (span , attributes )
96100 self .log_exporter .export ([log_data ])
97101
98102 span_key = (span .context .trace_id , span .context .span_id )
@@ -105,39 +109,38 @@ def shutdown(self) -> None:
105109 self .condition .notify_all ()
106110 self .worker_thread .join ()
107111
108- def get_heartbeat_attributes (self ):
112+ def get_heartbeat_attributes (self ) -> dict [ str , str ] :
109113 return {
110114 "partial.event" : "heartbeat" ,
111115 "partial.frequency" : str (self .heartbeat_interval_millis ) + "ms" ,
112116 }
113117
118+ def get_log_data (self , span : Span , attributes : dict [str , str ]) -> LogData :
119+ span_context = Span .get_span_context (span )
120+
121+ enc_spans = encode_spans ([span ]).resource_spans
122+ traces_data = trace_pb2 .TracesData ()
123+ traces_data .resource_spans .extend (enc_spans )
124+ serialized_traces_data = traces_data .SerializeToString ()
125+
126+ log_record = LogRecord (
127+ timestamp = time .time_ns (),
128+ observed_timestamp = time .time_ns (),
129+ trace_id = span_context .trace_id ,
130+ span_id = span_context .span_id ,
131+ trace_flags = TraceFlags ().get_default (),
132+ severity_text = "INFO" ,
133+ severity_number = SeverityNumber .INFO ,
134+ body = base64 .b64encode (serialized_traces_data ).decode ("utf-8" ),
135+ resource = self .resource ,
136+ attributes = attributes ,
137+ )
138+ return LogData (
139+ log_record = log_record , instrumentation_scope = span .instrumentation_scope ,
140+ )
114141
115- def get_stop_attributes ():
142+
143+ def get_stop_attributes () -> dict [str , str ]:
116144 return {
117145 "partial.event" : "stop" ,
118146 }
119-
120-
121- def get_log_data (span , attributes ):
122- span_context = Span .get_span_context (span )
123-
124- enc_spans = encode_spans ([span ]).resource_spans
125- traces_data = trace_pb2 .TracesData ()
126- traces_data .resource_spans .extend (enc_spans )
127- serialized_traces_data = traces_data .SerializeToString ()
128-
129- log_record = LogRecord (
130- timestamp = time .time_ns (),
131- observed_timestamp = time .time_ns (),
132- trace_id = span_context .trace_id ,
133- span_id = span_context .span_id ,
134- trace_flags = TraceFlags ().get_default (),
135- severity_text = "INFO" ,
136- severity_number = SeverityNumber .INFO ,
137- body = base64 .b64encode (serialized_traces_data ).decode ('utf-8' ),
138- attributes = attributes ,
139- )
140- log_data = LogData (
141- log_record = log_record , instrumentation_scope = span .instrumentation_scope
142- )
143- return log_data
0 commit comments