Skip to content

Commit 7c185c5

Browse files
authored
ULS v1.5 (#28)
Version 1.5.0
1 parent c968695 commit 7c185c5

File tree

14 files changed

+260
-139
lines changed

14 files changed

+260
-139
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
.DS_Store
2+
__pycache__
3+
/ext/*

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.10.4-slim-bullseye
1+
FROM python:3.10.5-slim-bullseye
22
LABEL MAINTAINER="Mike Schiessl - [email protected]"
33
LABEL APP_LONG="Akamai Universal Log Streamer"
44
LABEL APP_SHORT="ULS"
@@ -11,7 +11,7 @@ ARG ULS_DIR="$HOMEDIR/uls"
1111
ARG EXT_DIR="$ULS_DIR/ext"
1212

1313
ARG ETP_CLI_VERSION="0.3.8"
14-
ARG EAA_CLI_VERSION="0.5.0"
14+
ARG EAA_CLI_VERSION="0.5.0.1"
1515
ARG MFA_CLI_VERSION="0.0.9"
1616

1717
# ENV VARS

bin/config/global_config.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22

33
# Common global variables / constants
4-
__version__ = "1.4.0"
4+
__version__ = "1.5.0"
55
__tool_name_long__ = "Akamai Unified Log Streamer"
66
__tool_name_short__ = "ULS"
77

@@ -52,6 +52,7 @@
5252
input_rerun_delay = 1 # Time in seconds between rerun attempts
5353
input_disable_stderr = True # Enable STDERR output disabling (see value below to specify when this should happen)
5454
input_disable_stderr_after = 25 # Disable stderr output after x input_cli cycles --> to prevent buffer overflow
55+
input_queue_size = 10000 # Maximum number of events we want to store in-memory, default is 10000
5556

5657
# OUTPUT Configuration
5758
output_reconnect_retries = 10 # Number of reconnect attempts before giving up
@@ -64,11 +65,13 @@
6465
## HTTP
6566
output_http_header = {'User-Agent': f'{__tool_name_long__}/{__version__}'} # HTTP Additional Headers to send (requests module KV pairs)
6667
output_http_timeout = 10 # Timeout after which a request will be considered as failed
68+
output_http_aggregate_count = 50 # Number of events to aggregate in POST request to HTTP Collector. 1 mean no aggregation
69+
output_http_aggregate_idle = 5 # Aggregate will send the data regardless of the count if the previous event was x secs ago
6770
## FILE
6871
output_file_encoding = "utf-8" # FILE Encoding setting
6972
output_file_handler_choices = ['SIZE', 'TIME'] # Available Choices for the file handler
7073
output_file_default_backup_count = 3 # Default number of backup files (after rotation)
71-
output_file_default_maxbytes = 50 * 1024 * 1024 # Default maximum size of a file when rotated by the FILE - handler
74+
output_file_default_maxbytes = 50 * 1024 * 1024 # Default maximum size of a file when rotated by the FILE - handler
7275
output_file_default_time_use_utc = False # Use UTC instead of local system time (Default: False)
7376
output_file_time_choices = ['S','M','H','D','W0','W1','W2','W3','W4','W5','W6','MIDNIGHT'] # Available choices for the time unit
7477
output_file_time_default = 'M' # Default value for the time unit (Minutes)
@@ -83,10 +86,10 @@
8386
edgerc_eaa_legacy = ["eaa_api_host", "eaa_api_key", "eaa_api_secret"] # required for EAA - Legacy
8487
edgerc_mfa = ["mfa_integration_id", "mfa_signing_key"] # Required for MFA
8588
edgerc_documentation_url = "https://github.com/akamai/uls/blob/main/docs/AKAMAI_API_CREDENTIALS.md"
86-
edgerc_mock_file = "ext/edgerc" # Required for display the version if no edgercfile was given
89+
edgerc_mock_file = "ext/edgerc" # Required for display the version if no edgercfile was given
8790

8891
# Autoresume Configuration
8992
autoresume_checkpoint_path = "var/" # (Default) Path, where the checkpointfiles should be stored to
90-
autoresume_supported_inputs = ['ETP', 'EAA'] # Internal Var only, to adjust supported inputs
93+
autoresume_supported_inputs = ['ETP', 'EAA'] # Internal Var only, to adjust supported inputs
9194
autoresume_write_after = 1000 # Write checkpoint only every ${autoresume_write_every} loglines
9295

bin/modules/UlsInputCli.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import time
1818
import shlex
1919
import os
20+
import threading
21+
import queue
2022

2123
# ULS modules
2224
import config.global_config as uls_config
@@ -272,9 +274,10 @@ def proc_create(self):
272274
self.proc = self.cli_proc
273275
self.proc_output = self.cli_proc.stdout
274276

275-
# Unblocking on windows causes trouble so we're avoiding it
276-
if not os.name == 'nt':
277-
os.set_blocking(self.proc_output.fileno(), False)
277+
# Non-blocking on windows causes trouble so we're avoiding it
278+
# 2022-07-08: Disabled completely see EME-588
279+
# if not os.name == 'nt':
280+
# os.set_blocking(self.proc_output.fileno(), False)
278281

279282
time.sleep(1)
280283

@@ -338,4 +341,49 @@ def check_proc(self):
338341
self.proc_create()
339342
return False
340343

344+
def ingest(self, stopEvent, event_q, monitor):
345+
"""
346+
Ingest CLI incoming data asynchronously
347+
The function will return immediately
348+
Args:
349+
stopEvent (_type_): Stop Event that will stop the current thread
350+
event_q (_type_): The queue where to put the message
351+
monitor (): Monitor
352+
"""
353+
self.stopEvent = stopEvent
354+
self.event_queue = event_q
355+
self.monitor = monitor
356+
self.ingest_thread = threading.Thread(target=self.ingest_loop)
357+
self.ingest_thread.setName("ingest_loop")
358+
self.ingest_thread.start()
359+
360+
def ingest_loop(self):
361+
362+
# When reading CLI output, if no data, pause 10ms before retrying
363+
# if still no data, then it will backoff exponentially till 60s
364+
wait_default = uls_config.main_wait_default
365+
wait_max = uls_config.main_wait_max
366+
wait = wait_default
367+
368+
while not self.stopEvent.is_set():
369+
try:
370+
371+
# Ensure the Input handler is still running (rebump if stale)
372+
self.check_proc()
373+
374+
input_data = self.proc_output.readline()
375+
if input_data:
376+
self.event_queue.put(input_data, timeout=0.05)
377+
self.monitor.increase_message_ingested()
378+
wait = wait_default # back to 10ms wait in case of bursty content
379+
else:
380+
aka_log.log.debug(f"ingest_loop, wait {wait} seconds [{self.monitor.get_stats()}]")
381+
self.stopEvent.wait(wait)
382+
wait = min(wait * 2, wait_max) # double the wait till a max of 60s
383+
except queue.Full:
384+
aka_log.log.fatal("Capacity exceeded, too many incoming data vs. slow output")
385+
self.stopEvent.set()
386+
except Exception:
387+
aka_log.log.exception("Error in ingest_loop")
388+
341389
# EOF

bin/modules/UlsMonitoring.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def __init__(self, stopEvent, product, feed, output):
5151
self.name = "UlsMonitoring" # Class Human readable name
5252
self.overall_messages_handled = 0 # Define overall number of messages handled
5353
self.window_messages_handled = 0 # Define mon_window number of messages handled
54+
self.window_messages_bytes = 0 # Total bytes processed during the window
55+
self.window_messages_ingested = 0 # Message ingested from UlsInputCli module
5456
self.init_time = time.time() # Define the init time
5557

5658
# Define the working thread, daemon allows us to offload
@@ -84,6 +86,8 @@ def display(self):
8486
'uls_runtime': self._runtime(),
8587
'event_count': self.overall_messages_handled,
8688
'event_count_interval': self.window_messages_handled,
89+
'event_ingested_interval': self.window_messages_ingested,
90+
'event_bytes_interval': self.window_messages_bytes,
8791
'event_rate': round(self.window_messages_handled / self.monitoring_interval, 2),
8892
'mon_interval': self.monitoring_interval
8993
}
@@ -94,13 +98,21 @@ def display(self):
9498
# Reset window based vars
9599
with self._metricLock:
96100
self.window_messages_handled = 0
101+
self.window_messages_bytes = 0
102+
self.window_messages_ingested = 0
97103
except Exception as e:
98104
aka_log.log.exception(e)
99105

100-
def increase_message_count(self):
106+
def increase_message_count(self, bytes=0):
101107
with self._metricLock:
102108
self.overall_messages_handled = self.overall_messages_handled + 1
103109
self.window_messages_handled = self.window_messages_handled + 1
110+
self.window_messages_bytes += bytes
111+
112+
def increase_message_ingested(self):
113+
with self._metricLock:
114+
self.window_messages_ingested += 1
115+
104116

105117
def get_message_count(self):
106118
return self.overall_messages_handled

bin/modules/UlsOutput.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import requests
2222
import logging
2323
import logging.handlers
24+
import random
2425

2526
# ULS specific modules
2627
import config.global_config as uls_config
@@ -93,6 +94,12 @@ def __init__(self, output_type: str,
9394

9495
# HTTP parameters
9596
elif self.output_type in ['HTTP'] and http_url:
97+
98+
# ---- Begin change for EME-588 ----
99+
self.aggregateList = list()
100+
self.aggregateListTick = None # Last time we added items in the list
101+
# ---- End change for EME-588 ----
102+
96103
self.http_url = http_url
97104
# apply other variables if SET
98105

@@ -375,7 +382,7 @@ def send_data(self, data):
375382
:return: True on successful send, False on error
376383
"""
377384
try:
378-
aka_log.log.info(f"{self.name} Trying to send data via {self.output_type}")
385+
aka_log.log.debug(f"{self.name} Trying to send data via {self.output_type}")
379386

380387
if self.output_type == "TCP":
381388
self.clientSocket.sendall(data)
@@ -384,30 +391,41 @@ def send_data(self, data):
384391
self.clientSocket.sendto(data, (self.host, self.port))
385392

386393
elif self.output_type == "HTTP":
387-
response = self.httpSession.post(url=self.http_url,
388-
data=self.http_out_format % (data.decode()),
389-
verify=self.http_verify_tls,
390-
timeout=self.http_timeout)
391-
aka_log.log.debug(f"{self.name} DATA Send response {response.status_code},"
392-
f" {response.text} ")
394+
self.aggregateList.append(data)
395+
if len(self.aggregateList) == uls_config.output_http_aggregate_count or (
396+
self.aggregateListTick is not None and
397+
self.aggregateListTick < time.time() - uls_config.output_http_aggregate_idle
398+
):
399+
data = uls_config.output_line_breaker.join(
400+
self.http_out_format % (event.decode()) for event in self.aggregateList)
401+
request = requests.Request('POST', url=self.http_url, data=data)
402+
prepped = self.httpSession.prepare_request(request)
403+
payload_length = prepped.headers["Content-Length"]
404+
response = self.httpSession.send(prepped, verify=self.http_verify_tls, timeout=self.http_timeout)
405+
response.close() # Free up the underlying TCP connection in the connection pool
406+
aka_log.log.info(f"{self.name} HTTP POST of {len(self.aggregateList)} event(s) "
407+
f"completed in {(response.elapsed.total_seconds()*1000):.3f} ms, "
408+
f"payload={payload_length} bytes, HTTP response {response.status_code}, "
409+
f"response={response.text} ")
410+
self.aggregateList.clear()
411+
self.aggregateListTick = time.time()
393412

394413
elif self.output_type == "RAW":
395414
sys.stdout.write(data.decode())
396415
sys.stdout.flush()
397416

398-
399417
elif self.output_type == "FILE":
400418
self.my_file_writer.info(f"{data.decode().rstrip()}")
401419

402420
else:
403421
aka_log.log.critical(f"{self.name} target was not defined {self.output_type} ")
404422
sys.exit(1)
405423

406-
aka_log.log.info(f"{self.name} Data successfully sent via {self.output_type}")
424+
aka_log.log.debug(f"{self.name} Data successfully sent via {self.output_type}")
407425
return True
408426

409427
except Exception as my_error:
410-
aka_log.log.error(f"{self.name} Issue sending data {my_error}")
428+
aka_log.log.exception(f"{self.name} Issue sending data {my_error}")
411429
self.connected = False
412430
self.connect()
413431
return False

0 commit comments

Comments
 (0)