Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fetch logs of enqueue_runs.py script #171

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0e80c92
add script and docs
daniel-ji Jul 17, 2024
1bb6678
enqueue runs log fetching fixes / add docs
daniel-ji Jul 17, 2024
b36c304
Merge branch 'main' into fetch-enqueue-logs
daniel-ji Jul 25, 2024
584180c
fix fetch logs script
daniel-ji Jul 25, 2024
07268a6
update docs for new fetch log changes
daniel-ji Jul 25, 2024
d47944c
cleanup code
daniel-ji Jul 25, 2024
7cd35c6
allow to partially pull logs
daniel-ji Jul 25, 2024
9df7a24
output dirs fix
daniel-ji Jul 25, 2024
2255b07
fix logging
daniel-ji Jul 25, 2024
6787c40
add total number of arns to log fetching script
daniel-ji Jul 26, 2024
436e897
Merge branch 'main' into daniel-ji/fetch-enqueue-logs
daniel-ji Aug 2, 2024
3b07be0
fix enqueue runs log fetching bug
daniel-ji Aug 2, 2024
2f98ab7
improve log fetching, add new flags and speed it up
daniel-ji Aug 3, 2024
0463b54
clean up enqueue_runs.py
daniel-ji Aug 3, 2024
cacb5cb
small fetch logs fix
daniel-ji Aug 5, 2024
bd7425f
chore: minor refactors (#184)
manasaV3 Aug 5, 2024
426599f
fix seed db data (#186)
daniel-ji Aug 5, 2024
dd7bb5f
Adding deposition support for annotations (#178)
manasaV3 Aug 6, 2024
bb9130f
fix config validation intellisense bug (#183)
daniel-ji Aug 6, 2024
aed0a2a
feat: Updating workflow to support deposition ingestion (#185)
manasaV3 Aug 6, 2024
5d9d2f3
Renaming deposition migration to more recent unix ts (#190)
manasaV3 Aug 7, 2024
0db13f2
Revert "Renaming deposition migration to more recent unix ts" (#191)
manasaV3 Aug 7, 2024
f25f0e3
Updating Jensen configs to include deposition metadata (#187)
manasaV3 Aug 7, 2024
9309b4d
Update table and column descriptions (#188)
andy-sweet Aug 8, 2024
a816fe3
Setup AWS S3 Data Validation (#180)
daniel-ji Aug 8, 2024
b3bbb48
full config files for 10.1101/2024.07.13.603322
uermel Jul 26, 2024
274bf28
exclusions for validation.
daniel-ji Aug 12, 2024
1da3f0a
add deposition metadata.
uermel Jul 26, 2024
56ec7d6
don't remove existing logs folder for fetch_enqueue_runs_logs.py
daniel-ji Aug 13, 2024
b340c85
Merge branch 'main' into daniel-ji/fetch-enqueue-logs
daniel-ji Aug 13, 2024
3f1566c
fix docs
daniel-ji Aug 15, 2024
578bdbf
Merge remote-tracking branch 'origin/main' into daniel-ji/fetch-enque…
jgadling Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ingestion_tools/docs/enqueue_runs.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ python3 enqueue_runs.py sync --exclude '*' --include 'Annotations/*.json' --s3-p
| --include-deposition | null | Look for deposition metadata with the deposition ids passed here. This helps sync the deposition data. |
| --no-sync-dataset | False | Skip syncing datasets. This is useful when we want to only update deposition data |

## Retrieving logs from job runs
Use the `fetch_enqueue_runs_logs.py` script to retrieve logs from the job runs. This script is used for fetching logs for the jobs run with `enqueue_runs.py` and categorizing them into success and failed directories based on the execution status. To provide the required ARNs needed for the `fetch_enqueue_runs_logs.py` script, run `enqueue_runs.py` with an output log file specified using the `--execution-machine-log` flag. See `fetch_enqueue_runs_logs.md` for more details.

## Building and pushing up a dev/test image:

Expand Down
47 changes: 47 additions & 0 deletions ingestion_tools/docs/fetch_enqueue_runs_logs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# fetch_enqueue_runs_logs.py

This script is used for fetching logs for the jobs run with `enqueue_runs.py` and categorizing them into success and failed directories based on the execution status. It looks into the AWS Step Functions executions and categorizes them into failed and success directories based on the execution status. The logs are retrieved using AWS CloudWatch Logs.

## One-time Setup
Make sure you have at least python 3.11 installed. If you need to work with multiple versions of python, [pyenv](https://github.com/pyenv/pyenv) can help with that.

Before running the script, ensure all the required packages are installed in a virtualenv:
```bash
cd ingestion_tools
python3 -m venv .venv # create a virtualenv
source .venv/bin/activate # activate the virtualenv
python3 -m pip install poetry # Install the poetry package manager
poetry install # Use poetry to install this package's dependencies
```

## Setting Up AWS Profile
Before running the script, you need to set up your AWS profile. This can be done by setting the AWS_PROFILE environment variable to the desired profile name. You can set the `AWS_PROFILE` variable using the following command:

`export AWS_PROFILE=your-profile-name`

## Script Usage
To generate the log file for the jobs run with `enqueue_runs.py`, run `enqueue_runs.py` with an output log file specified using the `--execution-machine-log` flag. This will generate a file containing the execution ARNs of the jobs run with `enqueue_runs.py`. This file can be used as input to `fetch_enqueue_runs_logs.py` to fetch the logs for the jobs.

Command-Line Arguments

`execution-arn`: One or more AWS Step Function execution ARNs. If multiple ARNs are provided, they should be separated by space.

`--input-file`: Path to a file containing a list of execution ARNs, one per line.

`--output-dir`: Directory to save the fetched logs. Defaults to ./fetch-logs.

`--profile`: AWS profile to use. If not provided, your default profile will be used.

`--failed-only`: Fetch logs only for failed executions.

`--links-only`: Only retrieve links to the CloudWatch logs, don't fetch any actual logs.

## Examples:

Fetch logs for specific execution ARNs:

`python fetch_enqueue_runs_logs.py arn:aws:states:us-west-2:123456789012:execution:StateMachineName:execution1 arn:aws:states:us-west-2:123456789012:execution:StateMachineName:execution2`

Fetch logs using an input file containing execution ARNs:

`python fetch_enqueue_runs_logs.py --input-file execution_arns.txt --output-dir /tmp/fetch-logs`
39 changes: 37 additions & 2 deletions ingestion_tools/scripts/enqueue_runs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os
import os.path
import re
import time
Expand Down Expand Up @@ -58,23 +59,41 @@ def enqueue_common_options(func):
help="Specify docker image tag, defaults to 'main'",
),
)
options.append(
click.option(
"--execution-machine-log",
type=str,
default=None,
help="Log execution machine ARNs to this file",
),
)
options.append(click.option("--memory", type=int, default=None, help="Specify memory allocation override"))
options.append(click.option("--parallelism", required=True, type=int, default=20))
for option in options:
func = option(func)
return func


def create_execution_machine_log_file(filename):
abs_path = os.path.abspath(filename)
if os.path.exists(abs_path):
os.remove(abs_path)
logger.warning("Removing existing %s file.", filename)

os.makedirs(os.path.dirname(abs_path), exist_ok=True)


def handle_common_options(ctx, kwargs):
ctx.obj = {
"environment": kwargs["environment"],
"ecr_repo": kwargs["ecr_repo"],
"ecr_tag": kwargs["ecr_tag"],
"execution_machine_log": kwargs["execution_machine_log"],
"memory": kwargs["memory"],
"parallelism": kwargs["parallelism"],
**get_aws_env(kwargs["environment"]),
}
enqueue_common_keys = ["environment", "ecr_repo", "ecr_tag", "memory", "parallelism"]
enqueue_common_keys = ["environment", "ecr_repo", "ecr_tag", "execution_machine_log", "memory", "parallelism"]
# Make sure to remove these common options from the list of args processed by commands.
for opt in enqueue_common_keys:
del kwargs[opt]
Expand All @@ -98,6 +117,7 @@ def run_job(
ecr_repo: str,
ecr_tag: str,
memory: int | None = None,
execution_machine_log: str | None = None,
**kwargs, # Ignore any the extra vars this method doesn't need.
):
if not memory:
Expand Down Expand Up @@ -128,12 +148,18 @@ def run_job(
service_name="stepfunctions",
)

return client.start_execution(
execution = client.start_execution(
stateMachineArn=state_machine_arn,
name=execution_name,
input=json.dumps(sfn_input_json),
)

if execution_machine_log is not None:
with open(execution_machine_log, "a") as f:
f.write(execution["executionArn"] + "\n")

return execution


def get_aws_env(environment):
# Learn more about our AWS environment
Expand Down Expand Up @@ -278,6 +304,9 @@ def db_import(
if not ctx.obj.get("memory"):
ctx.obj["memory"] = 4000

if ctx.obj.get("execution_machine_log") is not None:
create_execution_machine_log_file(ctx.obj.get("execution_machine_log"))

futures = []
with ProcessPoolExecutor(max_workers=ctx.obj["parallelism"]) as workerpool:
for dataset_id, _ in get_datasets(
Expand Down Expand Up @@ -388,6 +417,9 @@ def queue(
filter_datasets = [re.compile(pattern) for pattern in kwargs.get("filter_dataset_name", [])]
exclude_datasets = [re.compile(pattern) for pattern in kwargs.get("exclude_dataset_name", [])]

if ctx.obj.get("execution_machine_log") is not None:
create_execution_machine_log_file(ctx.obj.get("execution_machine_log"))

# Always iterate over depostions, datasets and runs.
for deposition in DepositionImporter.finder(config):
print(f"Processing deposition: {deposition.name}")
Expand Down Expand Up @@ -565,6 +597,9 @@ def sync(
for param, value in OrderedSyncFilters._options:
new_args.append(f"--{param.name} '{value}'")

if ctx.obj.get("execution_machine_log") is not None:
create_execution_machine_log_file(ctx.obj.get("execution_machine_log"))

if sync_dataset:
entities = get_datasets(
input_bucket,
Expand Down
144 changes: 144 additions & 0 deletions ingestion_tools/scripts/fetch_enqueue_runs_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Tuple

import click
from boto3 import Session

logger = logging.getLogger("db_import")
logging.basicConfig(level=logging.INFO)

LOG_GROUP_NAME = "/aws/batch/job"


def get_log_stream(session: Session, execution_arn: str) -> Tuple[bool, str]:
client = session.client("stepfunctions")
response = client.get_execution_history(
executionArn=execution_arn,
)

# get the last task scheduled with a log stream
history = response["events"]
history.sort(key=lambda x: x["timestamp"])
history = [
event
for event in history
if event["type"] == "TaskScheduled" and "LogStreamName" in event["taskScheduledEventDetails"]["parameters"]
]
if len(history) == 0:
logger.error("Skipping, no log stream found for %s", execution_arn)
return False, None
last_task_submitted = history[-1]
parameters = last_task_submitted["taskScheduledEventDetails"]["parameters"]
failed = re.search(r'"Status":"FAILED"', parameters, re.IGNORECASE)
return failed, re.search(r'"LogStreamName":"([a-zA-Z-/0-9]*)"', parameters).group(1)


def get_log_events(session: Session, log_group_name, log_stream_name):
client = session.client("logs")
response = client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
startFromHead=True,
)
events = response["events"]
return [event["message"] for event in events]


def process_arn(arn: str, session: Session, output_dir: str, failed_only: bool, links_only: bool) -> str:
log_stream_failed, log_stream_name = get_log_stream(session, arn)
if not log_stream_name:
logger.warning("No log stream found for %s, possibly still running", arn)
return

result = "FAILED" if log_stream_failed else "SUCCESS"
logger.info("%s: %s", result, arn)
output_file = (
output_dir + ("failed/" if log_stream_failed else "success/") + arn.replace("/", "_").replace(":", "_") + ".log"
)

if links_only:
link = f"https://console.aws.amazon.com/cloudwatch/home?region={session.region_name}#logEventViewer:group={LOG_GROUP_NAME};stream={log_stream_name}"
logger.info("Link: %s", link)
return result

if failed_only and not log_stream_failed:
return result

logs = get_log_events(session, LOG_GROUP_NAME, log_stream_name)
if os.path.exists(output_file):
logger.warning("Removing existing %s", output_file)
os.remove(output_file)
logger.info("Writing to %s", output_file)
with open(output_file, "w") as f:
f.write("\n".join(logs))

return result


@click.command()
@click.argument("execution-arn", type=str, nargs=-1)
@click.option("--input-file", type=str, help="A file containing a list of execution ARNs.")
@click.option("--output-dir", type=str, default="./fetch-logs", help="The directory to save the logs to.")
@click.option("--profile", type=str, default=None, help="The AWS profile to use.")
@click.option("--failed-only", is_flag=True, help="Only fetch logs for failed executions.")
@click.option("--links-only", is_flag=True, help="Only get CloudWatch log links, not the logs themselves.")
def main(execution_arn: list[str], input_file: str, output_dir: str, profile: str, failed_only: bool, links_only: bool):
input_execution_arn = execution_arn

if not execution_arn and not input_file:
logger.error("Please provide at least one execution ARN.")
return

if input_file and execution_arn:
logger.error("Please provide either execution ARNs or an execution ARN file, not both.")
return

if input_file:
if not os.path.exists(input_file):
logger.error("The provided execution ARN file does not exist.")
return

with open(input_file, "r") as f:
input_execution_arn = f.read().splitlines()

if output_dir[-1] != "/":
output_dir += "/"

# setup output directory
if not links_only:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if not os.path.exists(f"{output_dir}failed"):
os.makedirs(f"{output_dir}failed")
if not os.path.exists(f"{output_dir}success"):
os.makedirs(f"{output_dir}success")

input_execution_arn = list(set(input_execution_arn))
session = Session(region_name=input_execution_arn[0].split(":")[3], profile_name=profile)

failed_count = 0
successful_count = 0

# fetch logs, multithreaded
with ThreadPoolExecutor() as executor:
results = executor.map(
lambda arn: process_arn(arn, session, output_dir, failed_only, links_only),
input_execution_arn,
)
for result in results:
if result == "FAILED":
failed_count += 1
elif result == "SUCCESS":
successful_count += 1

logger.info("====================================")
logger.info("TOTAL FAILED: %d/%d", failed_count, len(input_execution_arn))
logger.info("TOTAL SUCCEEDED %d/%d", successful_count, len(input_execution_arn))
logger.info("TOTAL SKIPPED: %d", len(input_execution_arn) - failed_count - successful_count)


if __name__ == "__main__":
main()
Loading