Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,130 @@ def process(**kwargs):
sebs_client.logging.info("Save results to {}".format(output_file))


@benchmark.command("logs")
@click.argument(
"target",
type=str,
required=True,
)
@click.option(
"--request-id",
type=str,
default=None,
help="Request/invocation ID to query (optional, for specific invocation).",
)
@click.option(
"--minutes",
type=int,
default=30,
help="Number of minutes to look back for logs (default: 30, only for function name mode).",
)
@common_params
def query_logs(target, request_id, minutes, **kwargs):
"""
Query and display logs for benchmark invocations.

TARGET can be either:
1. Path to experiments.json file - queries logs for all invocations in that file
2. Function name - queries logs for that function (requires --request-id)

Examples:
./sebs.py benchmark logs experiments.json --config config.json
./sebs.py benchmark logs my-function --request-id abc123 --config config.json
./sebs.py benchmark logs my-function --request-id abc123 --minutes 60 --config config.json
"""
(
config,
output_dir,
logging_filename,
sebs_client,
deployment_client,
) = parse_common_params(**kwargs)

# Detect if target is a file or function name
is_file = os.path.isfile(target)

# Case 1: Load from experiments file
if is_file:
sebs_client.logging.info(f"Loading invocations from {target}")
with open(target, "r") as in_f:
exp_config = json.load(in_f)
experiments = sebs.experiments.ExperimentResult.deserialize(
exp_config,
sebs_client.cache_client,
sebs_client.generate_logging_handlers(logging_filename),
)

# Get time bounds from experiments
exp_start_time, exp_end_time = experiments.times()
start_time = int(exp_start_time)
end_time = int(exp_end_time)

# Query logs for all functions and invocations
for func_name in experiments.functions():
invocations = experiments.invocations(func_name)
sebs_client.logging.info(
f"\n{'=' * 80}\nFunction: {func_name} ({len(invocations)} invocations)\n{'=' * 80}"
)

for req_id, execution_result in invocations.items():
sebs_client.logging.info(f"\n{'-' * 80}\nRequest ID: {req_id}\n{'-' * 80}")

try:
logs = deployment_client.get_invocation_logs(
func_name, req_id, start_time, end_time
)

if logs:
for log_line in logs:
if log_line.strip(): # Skip empty lines
print(log_line)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use our logging, and aggregate logs according to invocation ID.

else:
sebs_client.logging.warning(f"No logs found for request {req_id}")

except Exception as e:
sebs_client.logging.error(f"Error retrieving logs for {req_id}: {e}")

# Case 2: Query specific function
else:
function_name = target

# Validate that request_id is provided
if not request_id:
sebs_client.logging.error(
"When querying by function name, please provide --request-id. "
"Alternatively, use an experiments.json file to query all invocations."
)
return

import time

# Calculate time window - look back X minutes from now
end_time = int(time.time())
start_time = end_time - (minutes * 60)

sebs_client.logging.info(
f"Querying logs for function '{function_name}', request ID '{request_id}' "
f"(last {minutes} minutes)"
)

try:
logs = deployment_client.get_invocation_logs(
function_name, request_id, start_time, end_time
)

if logs:
sebs_client.logging.info(f"\n{'-' * 80}\nLogs:\n{'-' * 80}")
for log_line in logs:
if log_line.strip(): # Skip empty lines
print(log_line)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

else:
sebs_client.logging.warning("No logs found")

except Exception as e:
sebs_client.logging.error(f"Error retrieving logs: {e}")


@benchmark.command()
@click.argument(
"benchmark-input-size", type=click.Choice(["test", "small", "large"])
Expand Down
2 changes: 1 addition & 1 deletion sebs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
SeBS
SeBS
"""

from .version import __version__ # noqa
Expand Down
80 changes: 80 additions & 0 deletions sebs/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,86 @@ def get_invocation_error(self, function_name: str, start_time: int, end_time: in
if value["field"] == "@message":
self.logging.error(value["value"])

def get_invocation_logs(
self, function_name: str, request_id: str, start_time: int, end_time: int
) -> List[str]:
"""
Retrieve full logs (stdout and stderr) for a specific invocation.

Args:
function_name: Name of the Lambda function
request_id: AWS request ID for the invocation
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp

Returns:
List of log messages for the invocation
"""
if not self.logs_client:
self.logs_client = boto3.client(
service_name="logs",
aws_access_key_id=self.config.credentials.access_key,
aws_secret_access_key=self.config.credentials.secret_key,
region_name=self.config.region,
)

# Query CloudWatch Logs for the specific request ID
query_string = (
f'filter @requestId = "{request_id}" | '
f"fields @timestamp, @message | sort @timestamp asc"
)

response = None
retries = 0
max_retries = 3

while retries < max_retries:
query = self.logs_client.start_query(
logGroupName="/aws/lambda/{}".format(function_name),
queryString=query_string,
startTime=math.floor(start_time),
endTime=math.ceil(end_time + 60), # Add buffer for log delivery
)
query_id = query["queryId"]

# Poll for query completion
while response is None or response["status"] == "Running":
time.sleep(1)
response = self.logs_client.get_query_results(queryId=query_id)

if len(response["results"]) > 0:
break

# Logs might not be available yet
retries += 1
if retries < max_retries:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max retires should be a cli parameter

self.logging.info(
f"AWS logs not yet available for request {request_id}, "
f"retrying in 10s... ({retries}/{max_retries})"
)
time.sleep(10)
response = None

# Extract log messages
log_messages = []
if response and "results" in response:
for log_entry in response["results"]:
message = None
timestamp = None
for field in log_entry:
if field["field"] == "@message":
message = field["value"]
elif field["field"] == "@timestamp":
timestamp = field["value"]

if message:
if timestamp:
log_messages.append(f"[{timestamp}] {message}")
else:
log_messages.append(message)

return log_messages

def download_metrics(
self,
function_name: str,
Expand Down
101 changes: 100 additions & 1 deletion sebs/azure/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def shutdown(self):
super().shutdown()

def find_deployments(self) -> List[str]:

"""
Look for duplicated resource groups.
"""
Expand Down Expand Up @@ -584,6 +583,106 @@ def download_metrics(

# TODO: query performance counters for mem

def get_invocation_logs(
self, function_name: str, invocation_id: str, start_time: int, end_time: int
) -> List[str]:
"""
Retrieve full logs (stdout and stderr) for a specific invocation.

Args:
function_name: Name of the Azure function
invocation_id: Azure invocation ID
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp

Returns:
List of log messages for the invocation
"""
self.cli_instance.install_insights()

resource_group = self.config.resources.resource_group(self.cli_instance)

# Get Application Insights ID
app_id_query = self.cli_instance.execute(
("az monitor app-insights component show " "--app {} --resource-group {}").format(
function_name, resource_group
)
).decode("utf-8")
application_id = json.loads(app_id_query)["appId"]

# Azure CLI requires date in the following format
# Format: date (yyyy-mm-dd) time (hh:mm:ss.xxxxx) timezone (+/-hh:mm)
start_time_str = datetime.datetime.fromtimestamp(start_time).strftime(
"%Y-%m-%d %H:%M:%S.%f"
)
end_time_str = datetime.datetime.fromtimestamp(end_time + 60).strftime("%Y-%m-%d %H:%M:%S")
from tzlocal import get_localzone

timezone_str = datetime.datetime.now(get_localzone()).strftime("%z")

# Query for traces (logs) associated with the invocation ID
query = (
f"union traces, requests | "
f"where customDimensions['InvocationId'] == '{invocation_id}' | "
f"project timestamp, message, severityLevel, itemType | "
f"order by timestamp asc"
)

log_messages = []
retries = 0
max_retries = 3

while retries < max_retries:
ret = self.cli_instance.execute(
(
'az monitor app-insights query --app {} --analytics-query "{}" '
"--start-time {} {} --end-time {} {}"
).format(
application_id,
query,
start_time_str,
timezone_str,
end_time_str,
timezone_str,
)
).decode("utf-8")

result = json.loads(ret)

if "tables" in result and len(result["tables"]) > 0:
table = result["tables"][0]
if len(table["rows"]) > 0:
# Extract messages
for row in table["rows"]:
timestamp = row[0]
message = row[1]
severity = row[2] if len(row) > 2 else None

if message:
prefix = f"[{timestamp}]"
if severity is not None:
severity_map = {
0: "VERBOSE",
1: "INFO",
2: "WARNING",
3: "ERROR",
4: "CRITICAL",
}
severity_str = severity_map.get(severity, str(severity))
prefix += f" [{severity_str}]"
log_messages.append(f"{prefix} {message}")
break

retries += 1
if retries < max_retries:
self.logging.info(
f"Azure logs not yet available for invocation {invocation_id}, "
f"retrying in 10s... ({retries}/{max_retries})"
)
time.sleep(10)

return log_messages

def _enforce_cold_start(self, function: Function, code_package: Benchmark):

self.update_envs(function, code_package, {"ForceColdStart": str(self.cold_start_counter)})
Expand Down
1 change: 0 additions & 1 deletion sebs/azure/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def login(self, appId: str, tenant: str, password: str) -> bytes:
return result

def upload_package(self, directory: str, dest: str):

"""
This is not an efficient and memory-intensive implementation.
So far, we didn't have very large functions that require many gigabytes.
Expand Down
1 change: 0 additions & 1 deletion sebs/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,6 @@ def build(
def prepare_input(
self, system_resources: SystemResources, size: str, replace_existing: bool = False
):

"""
Handle object storage buckets.
"""
Expand Down
1 change: 0 additions & 1 deletion sebs/faas/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def remove_bucket(self, bucket: str):
def benchmark_data(
self, benchmark: str, requested_buckets: Tuple[int, int]
) -> Tuple[List[str], List[str]]:

"""
Add an input path inside benchmarks bucket.
Bucket name format: name-idx-input
Expand Down
Loading