14
14
from __future__ import annotations
15
15
16
16
import abc
17
- import collections
18
17
import enum
19
18
import logging
20
- import os
21
19
import sys
22
- import threading
23
- import weakref
24
20
from os import environ , linesep
25
- from typing import IO , Callable , Deque , Optional , Sequence
21
+ from typing import IO , Callable , Optional , Sequence
26
22
27
23
from opentelemetry .context import (
28
24
_SUPPRESS_INSTRUMENTATION_KEY ,
31
27
set_value ,
32
28
)
33
29
from opentelemetry .sdk ._logs import LogData , LogRecord , LogRecordProcessor
30
+ from opentelemetry .sdk ._shared_internal import BatchProcessor
34
31
from opentelemetry .sdk .environment_variables import (
35
32
OTEL_BLRP_EXPORT_TIMEOUT ,
36
33
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE ,
37
34
OTEL_BLRP_MAX_QUEUE_SIZE ,
38
35
OTEL_BLRP_SCHEDULE_DELAY ,
39
36
)
40
- from opentelemetry .util ._once import Once
41
37
42
38
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
43
39
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
46
42
_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
47
43
"Unable to parse value for %s as integer. Defaulting to %s."
48
44
)
49
-
50
45
_logger = logging .getLogger (__name__ )
51
46
52
47
@@ -55,29 +50,19 @@ class LogExportResult(enum.Enum):
55
50
FAILURE = 1
56
51
57
52
58
- class BatchLogExportStrategy (enum .Enum ):
59
- EXPORT_ALL = 0
60
- EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61
- EXPORT_AT_LEAST_ONE_BATCH = 2
62
-
63
-
64
53
class LogExporter (abc .ABC ):
65
54
"""Interface for exporting logs.
66
-
67
55
Interface to be implemented by services that want to export logs received
68
56
in their own format.
69
-
70
57
To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
71
58
log processor.
72
59
"""
73
60
74
61
@abc .abstractmethod
75
62
def export (self , batch : Sequence [LogData ]):
76
63
"""Exports a batch of logs.
77
-
78
64
Args:
79
65
batch: The list of `LogData` objects to be exported
80
-
81
66
Returns:
82
67
The result of the export
83
68
"""
@@ -146,9 +131,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
146
131
return True
147
132
148
133
149
- _BSP_RESET_ONCE = Once ()
150
-
151
-
152
134
class BatchLogRecordProcessor (LogRecordProcessor ):
153
135
"""This is an implementation of LogRecordProcessor which creates batches of
154
136
received logs in the export-friendly LogData representation and
@@ -161,9 +143,9 @@ class BatchLogRecordProcessor(LogRecordProcessor):
161
143
- :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
162
144
- :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
163
145
- :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
164
- """
165
146
166
- _queue : Deque [LogData ]
147
+ All the logic for emitting logs, shutting down etc. resides in the BatchProcessor class.
148
+ """
167
149
168
150
def __init__ (
169
151
self ,
@@ -194,127 +176,24 @@ def __init__(
194
176
BatchLogRecordProcessor ._validate_arguments (
195
177
max_queue_size , schedule_delay_millis , max_export_batch_size
196
178
)
197
-
198
- self ._exporter = exporter
199
- self ._max_queue_size = max_queue_size
200
- self ._schedule_delay = schedule_delay_millis / 1e3
201
- self ._max_export_batch_size = max_export_batch_size
202
- # Not used. No way currently to pass timeout to export.
203
- # TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
204
- self ._export_timeout_millis = export_timeout_millis
205
- # Deque is thread safe.
206
- self ._queue = collections .deque ([], max_queue_size )
207
- self ._worker_thread = threading .Thread (
208
- name = "OtelBatchLogRecordProcessor" ,
209
- target = self .worker ,
210
- daemon = True ,
179
+ # Initializes BatchProcessor
180
+ self ._batch_processor = BatchProcessor (
181
+ exporter ,
182
+ schedule_delay_millis ,
183
+ max_export_batch_size ,
184
+ export_timeout_millis ,
185
+ max_queue_size ,
186
+ "Log" ,
211
187
)
212
188
213
- self ._shutdown = False
214
- self ._export_lock = threading .Lock ()
215
- self ._worker_awaken = threading .Event ()
216
- self ._worker_thread .start ()
217
- if hasattr (os , "register_at_fork" ):
218
- weak_reinit = weakref .WeakMethod (self ._at_fork_reinit )
219
- os .register_at_fork (after_in_child = lambda : weak_reinit ()()) # pylint: disable=unnecessary-lambda
220
- self ._pid = os .getpid ()
221
-
222
- def _should_export_batch (
223
- self , batch_strategy : BatchLogExportStrategy , num_iterations : int
224
- ) -> bool :
225
- if not self ._queue :
226
- return False
227
- # Always continue to export while queue length exceeds max batch size.
228
- if len (self ._queue ) >= self ._max_export_batch_size :
229
- return True
230
- if batch_strategy is BatchLogExportStrategy .EXPORT_ALL :
231
- return True
232
- if batch_strategy is BatchLogExportStrategy .EXPORT_AT_LEAST_ONE_BATCH :
233
- return num_iterations == 0
234
- return False
235
-
236
- def _at_fork_reinit (self ):
237
- self ._export_lock = threading .Lock ()
238
- self ._worker_awaken = threading .Event ()
239
- self ._queue .clear ()
240
- self ._worker_thread = threading .Thread (
241
- name = "OtelBatchLogRecordProcessor" ,
242
- target = self .worker ,
243
- daemon = True ,
244
- )
245
- self ._worker_thread .start ()
246
- self ._pid = os .getpid ()
247
-
248
- def worker (self ):
249
- while not self ._shutdown :
250
- # Lots of strategies in the spec for setting next timeout.
251
- # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
252
- # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
253
- sleep_interrupted = self ._worker_awaken .wait (self ._schedule_delay )
254
- if self ._shutdown :
255
- break
256
- self ._export (
257
- BatchLogExportStrategy .EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
258
- if sleep_interrupted
259
- else BatchLogExportStrategy .EXPORT_AT_LEAST_ONE_BATCH
260
- )
261
- self ._worker_awaken .clear ()
262
- self ._export (BatchLogExportStrategy .EXPORT_ALL )
263
-
264
- def _export (self , batch_strategy : BatchLogExportStrategy ) -> None :
265
- with self ._export_lock :
266
- iteration = 0
267
- # We could see concurrent export calls from worker and force_flush. We call _should_export_batch
268
- # once the lock is obtained to see if we still need to make the requested export.
269
- while self ._should_export_batch (batch_strategy , iteration ):
270
- iteration += 1
271
- token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
272
- try :
273
- self ._exporter .export (
274
- [
275
- # Oldest records are at the back, so pop from there.
276
- self ._queue .pop ()
277
- for _ in range (
278
- min (
279
- self ._max_export_batch_size ,
280
- len (self ._queue ),
281
- )
282
- )
283
- ]
284
- )
285
- except Exception : # pylint: disable=broad-exception-caught
286
- _logger .exception ("Exception while exporting logs." )
287
- detach (token )
288
-
289
189
def emit (self , log_data : LogData ) -> None :
290
- if self ._shutdown :
291
- _logger .info ("Shutdown called, ignoring log." )
292
- return
293
- if self ._pid != os .getpid ():
294
- _BSP_RESET_ONCE .do_once (self ._at_fork_reinit )
295
-
296
- if len (self ._queue ) == self ._max_queue_size :
297
- _logger .warning ("Queue full, dropping log." )
298
- self ._queue .appendleft (log_data )
299
- if len (self ._queue ) >= self ._max_export_batch_size :
300
- self ._worker_awaken .set ()
190
+ return self ._batch_processor .emit (log_data )
301
191
302
192
def shutdown (self ):
303
- if self ._shutdown :
304
- return
305
- # Prevents emit and force_flush from further calling export.
306
- self ._shutdown = True
307
- # Interrupts sleep in the worker, if it's sleeping.
308
- self ._worker_awaken .set ()
309
- # Main worker loop should exit after one final export call with flush all strategy.
310
- self ._worker_thread .join ()
311
- self ._exporter .shutdown ()
193
+ return self ._batch_processor .shutdown ()
312
194
313
- def force_flush (self , timeout_millis : Optional [int ] = None ) -> bool :
314
- if self ._shutdown :
315
- return
316
- # Blocking call to export.
317
- self ._export (BatchLogExportStrategy .EXPORT_ALL )
195
+ def force_flush (self , timeout_millis : Optional [int ] = None ):
196
+ return self ._batch_processor .force_flush (timeout_millis )
318
197
319
198
@staticmethod
320
199
def _default_max_queue_size ():
0 commit comments