From 94904ebcdaf92a7f29ff11a42cf7263f20d335b1 Mon Sep 17 00:00:00 2001 From: Joe Ferguson Date: Mon, 15 Aug 2022 12:58:02 -0500 Subject: [PATCH 1/5] WIP Stuck --- week_1/project/week_1.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..95880dc7 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -51,12 +51,14 @@ def get_s3_data(context): @op -def process_data(): - pass +def process_data(stocks: Stock): + id, max= max(stocks, key=lambda stock: stock[4]) + print('max:', max, "At index:",id) + return stocks @op -def put_redis_data(): +def put_redis_data(aggregate: Aggregation): pass From 667cd3cf3cc6af5bba401697acc19c8949e472b1 Mon Sep 17 00:00:00 2001 From: Joe Ferguson Date: Mon, 15 Aug 2022 14:40:53 -0500 Subject: [PATCH 2/5] Saving progress on week 1 --- week_1/project/week_1.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 95880dc7..3250004e 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List -from dagster import In, Nothing, Out, job, op, usable_as_dagster_type +from dagster import In, Nothing, Out, job, op, usable_as_dagster_type, get_dagster_logger from pydantic import BaseModel @@ -50,18 +50,22 @@ def get_s3_data(context): return output -@op -def process_data(stocks: Stock): - id, max= max(stocks, key=lambda stock: stock[4]) - print('max:', max, "At index:",id) - return stocks +@op( + ins={"stocks": In(dagster_type=List, description="List from CSV")}, + out={"aggregation": Out(dagster_type=Aggregation, description="Aggregation output")}, + description="Find the highest value in the high field", +) +def process_data(stocks: List): + highest = sorted(stocks, key=lambda stock: stock.high, reverse=True).pop(0) + return Aggregation(date=highest.date, high=highest.high) -@op + +@op(ins={"aggregate": In(dagster_type=Aggregation)}) def put_redis_data(aggregate: Aggregation): pass @job def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) From 9cf3ce33239ee9ef021bc5e482355c49dd125eb2 Mon Sep 17 00:00:00 2001 From: Joe Ferguson Date: Sun, 21 Aug 2022 09:42:25 -0500 Subject: [PATCH 3/5] Add logging example --- week_1/project/week_1.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 3250004e..d23bd46e 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -63,7 +63,8 @@ def process_data(stocks: List): @op(ins={"aggregate": In(dagster_type=Aggregation)}) def put_redis_data(aggregate: Aggregation): - pass + log = get_dagster_logger() + log.info("Put Data into Redis") @job From b214a88efeba95892a6d57e47a5ffe6444359c67 Mon Sep 17 00:00:00 2001 From: Joe Ferguson Date: Mon, 22 Aug 2022 11:02:26 -0500 Subject: [PATCH 4/5] Working on Week 2 --- week_2/dagster_ucr/project/week_2.py | 47 +++++++++++++++++++++------- week_2/dagster_ucr/resources.py | 34 ++++++++++++++++---- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 8e32a715..92ac5bcf 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -5,26 +5,51 @@ from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + config_schema={"s3_key": str}, + out={"stocks": Out(dagster_type=List[Stock])}, + required_resource_keys={"s3"}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context): + s3 = context.resources.s3 + stocks = s3.get_data(context.op_config["s3_key"]) + + return [Stock.from_list(stock) for stock in stocks] + +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"aggregation": Out(dagster_type=Aggregation)}, + tags={"kind": "python"}, + description="Find the highest value in the high field", +) +def process_data(stocks): + + hs = sorted(stocks, key=lambda x: x.high, reverse=True)[0] + # context.log.info(f'Stock with the greatest high value: {hs.date} -> {hs.high}') -@op -def process_data(): - # Use your op from week 1 - pass + return Aggregation(date=hs.date, high=hs.high) -@op -def put_redis_data(): - pass +@op( + ins={"aggregation": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + tags={"kind": "redis"}, + description="Upload aggregations to Redis", +) +def put_redis_data(context, aggregation): + redis = context.resources.redis + redis.put_data(str(aggregation.date), str(aggregation.high)) @graph def week_2_pipeline(): # Use your graph from week 1 - pass + stocks = get_s3_data() + aggregation = process_data(stocks) + put_redis_data(aggregation) local = { diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..f7aa46e4 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,35 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String), + }, + description="A resource that can run S3", +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int), + }, + description="A resource that can run Redis", +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"], + ) From 703fe8b41b685527dc6c86560e9df6522d9c9a5c Mon Sep 17 00:00:00 2001 From: Joe Ferguson Date: Sun, 4 Sep 2022 10:59:04 -0500 Subject: [PATCH 5/5] Week 3 project, Tests Passing --- week_3/project/week_3.py | 118 ++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 25 deletions(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index f4753753..3159bff7 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -1,9 +1,11 @@ +from operator import attrgetter from typing import List from dagster import ( In, Nothing, Out, + get_dagster_logger, ResourceDefinition, RetryPolicy, RunRequest, @@ -19,28 +21,47 @@ from project.types import Aggregation, Stock -@op -def get_s3_data(): - # Use your ops from week 2 - pass - +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock], description="Stocks List")}, + tags={"kind": "s3"}, +) +def get_s3_data(context): + output = list() + for row in context.resources.s3.get_data(context.op_config["s3_key"]): + stock = Stock.from_list(row) + output.append(stock) + return output + + +@op( + ins={"stocks": In(dagster_type=List, description="List from CSV")}, + out={"aggregation": Out(dagster_type=Aggregation, description="Aggregation output")}, + description="Find the highest value in the high field", +) +def process_data(stocks: List): + highest = sorted(stocks, key=lambda stock: stock.high, reverse=True).pop(0) -@op -def process_data(): - # Use your ops from week 2 - pass + return Aggregation(date=highest.date, high=highest.high) -@op -def put_redis_data(): - # Use your ops from week 2 - pass +@op( + required_resource_keys={"redis"}, + ins={"highest_stock": In(dagster_type=Aggregation)}, + out=Out(dagster_type=Nothing), + description="Put Data into Redis", + tags={"kind": "redis"}, +) +def put_redis_data(context, highest_stock): + log = get_dagster_logger() + log.info("Put Data into Redis") + context.resources.redis.put_data(name=f"{highest_stock.date}:%m/%d/%Y", value=str(highest_stock.high)) @graph def week_3_pipeline(): - # Use your graph from week 2 - pass + put_redis_data(process_data(get_s3_data())) local = { @@ -69,8 +90,27 @@ def week_3_pipeline(): } -def docker_config(): - pass +@static_partitioned_config(partition_keys=["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]) +def docker_config(partition_key: int): + return { + "resources": { + "s3": { + "config": { + "bucket": "dagster", + "access_key": "test", + "secret_key": "test", + "endpoint_url": "http://host.docker.internal:4566", + } + }, + "redis": { + "config": { + "host": "redis", + "port": 6379, + } + }, + }, + "ops": {"get_s3_data": {"config": {"s3_key": f"prefix/stock_{partition_key}.csv"}}}, + } local_week_3_pipeline = week_3_pipeline.to_job( @@ -89,14 +129,42 @@ def docker_config(): "s3": s3_resource, "redis": redis_resource, }, + op_retry_policy=RetryPolicy(max_retries=10, delay=1), ) -local_week_3_schedule = None # Add your schedule - -docker_week_3_schedule = None # Add your schedule - - -@sensor -def docker_week_3_sensor(): - pass +local_week_3_schedule = ScheduleDefinition(job=local_week_3_pipeline, cron_schedule="*/15 * * * *") + +docker_week_3_schedule = ScheduleDefinition(job=docker_week_3_pipeline, cron_schedule="0 * * * *") + + +@sensor(job=docker_week_3_pipeline) +def docker_week_3_sensor(context): + new_files = get_s3_keys(bucket="dagster", prefix="prefix", endpoint_url="http://host.docker.internal:4566") + + if not new_files: + yield SkipReason("No new s3 files found in bucket.") + return + for new_file in new_files: + yield RunRequest( + run_key=new_file, + run_config={ + "resources": { + "s3": { + "config": { + "bucket": "dagster", + "access_key": "test", + "secret_key": "test", + "endpoint_url": "http://host.docker.internal:4566", + } + }, + "redis": { + "config": { + "host": "redis", + "port": 6379, + } + }, + }, + "ops": {"get_s3_data": {"config": {"s3_key": new_file}}}, + }, + )