diff --git a/.gitpod.Dockerfile b/.gitpod.Dockerfile index 45da7ce8..68406718 100644 --- a/.gitpod.Dockerfile +++ b/.gitpod.Dockerfile @@ -7,4 +7,4 @@ RUN pip install "poetry==1.1.12" COPY poetry.lock pyproject.toml / RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi \ No newline at end of file + && poetry install --no-interaction --no-ansi diff --git a/week_1/challenge/week_1_challenge.py b/week_1/challenge/week_1_challenge.py index 0c84d98d..7d97fdd6 100644 --- a/week_1/challenge/week_1_challenge.py +++ b/week_1/challenge/week_1_challenge.py @@ -55,18 +55,37 @@ def csv_helper(file_name: str) -> Iterator[Stock]: yield Stock.from_list(row) -@op -def get_s3_data(): - pass - +@op ( + config_schema = { + "s3_key": String + }, + out = { + "stocks": Out(List[Stock],is_required=False), + "empty_stocks": Out(Any,is_required=False) + } +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + iter = csv_helper(s3_key) + if not any(iter): + yield Output(None, "empty_stocks") + else: + yield Output(list(iter), "stocks") -@op -def process_data(): - pass +@op( + config_schema = {"nlargest": int}, + out = DynamicOut(Aggregation) +) +def process_data(context, stocks: List[Stock])-> Aggregation: + stocks.sort(key= lambda stock: stock.high, reverse= True) + n = context.op_config["nlargest"] + for i in range(0,n): + agg = Aggregation(date=stocks[n].date, high=stocks[n].high) + yield DynamicOutput(agg, mapping_key=str(i)) @op -def put_redis_data(): +def put_redis_data(context, aggs: Aggregation) -> Nothing: pass @@ -80,4 +99,8 @@ def empty_stock_notify(context, empty_stocks) -> Nothing: @job def week_1_challenge(): - pass + stocks, no_stock = get_s3_data() + processed = process_data(stocks) + empty_stock_notify(no_stock) + processed.map(put_redis_data) + diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 095b93af..8405ed06 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -41,21 +41,31 @@ def csv_helper(file_name: str) -> Iterator[Stock]: yield Stock.from_list(row) -@op -def get_s3_data(): - pass - - -@op -def process_data(): - pass - +@op ( + config_schema= { + "s3_key": String + } +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + return list(csv_helper(s3_key)) + + +@op ( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg": Out(dagster_type=Aggregation)} +) +def process_data(context, stocks: List[Stock]) -> Aggregation: + highest_stock = max(stocks, key= lambda stock: stock.high) + return Aggregation(date=highest_stock.date,high=highest_stock.high) @op -def put_redis_data(): +def put_redis_data(context, aggs: Aggregation) -> Nothing: pass @job def week_1_pipeline(): - pass + s3_data = get_s3_data() + processed = process_data(s3_data) + put_redis_data(processed) diff --git a/week_2/workspaces/challenge/week_2_challenge.py b/week_2/workspaces/challenge/week_2_challenge.py index 10d3f677..42bc69ae 100644 --- a/week_2/workspaces/challenge/week_2_challenge.py +++ b/week_2/workspaces/challenge/week_2_challenge.py @@ -1,9 +1,21 @@ from random import randint -from dagster import In, Nothing, String, graph, op -from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op +from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op, DbtOutput from workspaces.resources import postgres_resource +from dagster import ( + In, + Out, + Nothing, + String, + Any, + graph, + op, + HookContext, + failure_hook, + success_hook +) + DBT_PROJECT_PATH = "/opt/dagster/dagster_home/dbt_test_project/." @@ -21,27 +33,52 @@ def create_dbt_table(context) -> String: context.resources.database.execute_query(sql) return table_name - @op( ins={"table_name": In(dagster_type=String)}, required_resource_keys={"database"}, tags={"kind": "postgres"}, ) -def insert_dbt_data(context, table_name): +def insert_dbt_data(context, table_name)-> Nothing: sql = f"INSERT INTO {table_name} (column_1, column_2, column_3) VALUES ('A', 'B', 'C');" number_of_rows = randint(1, 100) for _ in range(number_of_rows): context.resources.database.execute_query(sql) context.log.info("Inserted a row") - context.log.info("Batch inserted") +@op( + required_resource_keys={"dbt"}, + tags={"kind": "dbt"} +) +def dbt_run() -> DbtOutput: + dbt_run_op() + + +@op ( + required_resource_keys={"dbt"}, + tags={"kind": "dbt"} +) +def dbt_test() -> DbtOutput: + dbt_test_op() + + +@success_hook +def notify_success(context: HookContext) -> Nothing: + message = f"Op {context.op.name} finished successfully" + context.log.info(message) + + +@failure_hook +def notify_failure(context: HookContext) -> Nothing: + message = f"Op {context.op.name} failed" + context.log.info(message) + @graph def week_2_challenge(): - pass - + tbl = create_dbt_table() + dbt_test_op.with_hooks({notify_success, notify_failure})(dbt_run_op(insert_dbt_data(tbl))) docker = { "resources": { @@ -68,4 +105,9 @@ def week_2_challenge(): week_2_challenge_docker = week_2_challenge.to_job( name="week_2_challenge_docker", + config = docker, + resource_defs = { + "database": postgres_resource, + "dbt": dbt_cli_resource + } ) diff --git a/week_2/workspaces/project/week_2.py b/week_2/workspaces/project/week_2.py index 8b184294..da88f1e4 100644 --- a/week_2/workspaces/project/week_2.py +++ b/week_2/workspaces/project/week_2.py @@ -5,29 +5,46 @@ from workspaces.types import Aggregation, Stock -@op -def get_s3_data(): - pass +@op( + config_schema={ + "s3_key": String + }, + required_resource_keys={"s3"} +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + return [Stock.from_list(item) for item in context.resources.s3.get_data(s3_key)] -@op -def process_data(): - pass +@op ( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg": Out(dagster_type=Aggregation)} +) +def process_data(context, stocks: List[Stock]) -> Aggregation: + highest_stock = max(stocks, key= lambda stock: stock.high) + return Aggregation(date=highest_stock.date,high=highest_stock.high) -@op -def put_redis_data(): - pass +@op( + required_resource_keys={"redis"} +) +def put_redis_data(context, agg: Aggregation) -> Nothing: + context.resources.redis.put_data(name=str(agg.date), value=str(agg.high)) -@op -def put_s3_data(): - pass +@op( + required_resource_keys={"s3"} +) +def put_s3_data(context, agg: Aggregation) -> Nothing: + context.resources.s3.put_data(key=agg.date, data=agg) @graph def week_2_pipeline(): - pass + stocks = get_s3_data() + processed = process_data(stocks) + put_redis_data(processed) + put_s3_data(processed) local = { @@ -56,8 +73,18 @@ def week_2_pipeline(): week_2_pipeline_local = week_2_pipeline.to_job( name="week_2_pipeline_local", + config= local, + resource_defs = { + "s3": mock_s3_resource, + "redis": ResourceDefinition.mock_resource() + } ) week_2_pipeline_docker = week_2_pipeline.to_job( name="week_2_pipeline_docker", + config= docker, + resource_defs = { + "s3": s3_resource, + "redis": redis_resource + } ) diff --git a/week_2/workspaces/resources.py b/week_2/workspaces/resources.py index 2c2cba81..dca0945e 100644 --- a/week_2/workspaces/resources.py +++ b/week_2/workspaces/resources.py @@ -98,13 +98,35 @@ def mock_s3_resource(context): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String) + }, + description="A resource that runs S3." +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"] + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int) + }, + description="A resource that runs Redis." +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"] + ) diff --git a/week_3/workspaces/challenge/week_3_challenge.py b/week_3/workspaces/challenge/week_3_challenge.py index 3be63e5c..35682397 100644 --- a/week_3/workspaces/challenge/week_3_challenge.py +++ b/week_3/workspaces/challenge/week_3_challenge.py @@ -1,27 +1,48 @@ -from dagster import In, IOManager, Nothing, Out, String, graph, io_manager, op +from dagster import ( + In, + IOManager, + Out, + String, + graph, + io_manager, + op, + InputContext, + OutputContext, + InitResourceContext +) from workspaces.resources import postgres_resource - +from workspaces.types import PostgresRecord +import random as rand +import string class PostgresIOManager(IOManager): - def __init__(self): - pass + def __init__(self,context: InitResourceContext): + self.pg = postgres_resource(context) - def handle_output(self): - pass + def handle_output(self, context: OutputContext, obj:PostgresRecord): + table_name = context.config['table_name'] + v1, v2, v3 = obj.v1, obj.v2, obj.v3 + sql = f"INSERT INTO {table_name} VALUES ({v1},{v2},{v3});" + self.pg.execute_query(sql) - def load_input(self, context): - pass + def load_input(self, context: InputContext): + table_name = context.upstream_output.config["table_name"] + sql= f"select count(*) from {table_name};" + return self.pg.exeecute_query(sql) -@io_manager(required_resource_keys={"postgres"}) +@io_manager( + output_config_schema={"table_name": str}, + required_resource_keys={"postgres"} +) def postgres_io_manager(init_context): - return PostgresIOManager() + return PostgresIOManager(init_context) @op( config_schema={"table_name": String}, required_resource_keys={"database"}, - tags={"kind": "postgres"}, + tags={"kind": "postgres"} ) def create_table(context) -> String: table_name = context.op_config["table_name"] @@ -33,20 +54,27 @@ def create_table(context) -> String: return table_name -@op -def insert_data(): - pass +@op( + out= Out(io_manager_key="postgres_io"), + description= "insert 3 random values into the specified table_name (schema.table)." +) +def insert_random_data() -> PostgresRecord: + v1, v2, v3 = [''.join(rand.choices(string.ascii_uppercase + string.digits, k=5)) for i in range(0,3)] + postgres_record = PostgresRecord(v1=v1, v2=v2, v3=v3) + return postgres_record -@op -def table_count(): +@op( + description= "count how many records are in the specified table_name (schema.table)." +) +def table_count(table_name: String) -> int: pass @graph def week_3_challenge(): - pass - + tbl_name = create_table() + insert_random_data(tbl_name) docker = { "resources": { @@ -59,8 +87,24 @@ def week_3_challenge(): } }, }, - "ops": {"create_table": {"config": {"table_name": "analytics.table"}}}, + "ops": { + "create_table":{ + "config": { + "table_name": "analytics.table" + } + }, + "insert_data":{ + "outputs": { + "result": { + "table_name": "analytics.table" + } + } + } + }, + "postgres_io": postgres_io_manager } -week_3_challenge_docker = week_3_challenge.to_job() +week_3_challenge_docker = week_3_challenge.to_job( + resource_defs= docker +) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 68e1f11b..2985e3fc 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -1,9 +1,10 @@ from typing import List - +import random from dagster import ( In, Nothing, Out, + String, ResourceDefinition, RetryPolicy, RunRequest, @@ -14,40 +15,51 @@ schedule, sensor, static_partitioned_config, + ScheduleEvaluationContext ) from workspaces.project.sensors import get_s3_keys from workspaces.resources import mock_s3_resource, redis_resource, s3_resource from workspaces.types import Aggregation, Stock -@op -def get_s3_data(): - # You can reuse the logic from the previous week - pass - - -@op -def process_data(): - # You can reuse the logic from the previous week - pass +@op( + config_schema={ + "s3_key": String + }, + required_resource_keys={"s3"} +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + return [Stock.from_list(item) for item in context.resources.s3.get_data(s3_key)] +@op ( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg": Out(dagster_type=Aggregation)} +) +def process_data(context, stocks: List[Stock]) -> Aggregation: + highest_stock = max(stocks, key= lambda stock: stock.high) + return Aggregation(date=highest_stock.date,high=highest_stock.high) -@op -def put_redis_data(): - # You can reuse the logic from the previous week - pass +@op( + required_resource_keys={"redis"} +) +def put_redis_data(context, agg: Aggregation) -> Nothing: + context.resources.redis.put_data(name=str(agg.date), value=str(agg.high)) -@op -def put_s3_data(): - # You can reuse the logic from the previous week - pass +@op( + required_resource_keys={"s3"} +) +def put_s3_data(context, agg: Aggregation) -> Nothing: + context.resources.s3.put_data(key_name=str(agg.date), data=agg) @graph def week_3_pipeline(): - # You can reuse the logic from the previous week - pass + stocks = get_s3_data() + processed = process_data(stocks) + put_redis_data(processed) + put_s3_data(processed) local = { @@ -75,28 +87,67 @@ def week_3_pipeline(): "ops": {"get_s3_data": {"config": {"s3_key": "prefix/stock_9.csv"}}}, } - -def docker_config(): - pass - +@static_partitioned_config( + partition_keys=[str(i) for i in range(1,11)] +) +def docker_config(partition_key: str): + partitioned_config = docker.copy() + partitioned_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"prefix/stock_{partition_key}.csv" + return partitioned_config week_3_pipeline_local = week_3_pipeline.to_job( name="week_3_pipeline_local", + config= local, + resource_defs = { + "s3": mock_s3_resource, + "redis": ResourceDefinition.mock_resource() + }, ) week_3_pipeline_docker = week_3_pipeline.to_job( name="week_3_pipeline_docker", + config= docker, + resource_defs = { + "s3": s3_resource, + "redis": redis_resource + }, + op_retry_policy=RetryPolicy(max_retries=10, delay=1) ) -week_3_schedule_local = None - - -@schedule -def week_3_schedule_docker(): - pass +week_3_schedule_local = ScheduleDefinition( + job=week_3_pipeline_local, + cron_schedule="*/15 * * * *" +) -@sensor -def week_3_sensor_docker(): - pass +@schedule( + job=week_3_pipeline_docker, + cron_schedule="0 * * * *" +) +def week_3_schedule_docker(context: ScheduleEvaluationContext): + return RunRequest( + run_key=context.scheduled_execution_time.strftime("%Y-%m-%d %H:%M:%S"), + run_config=docker_config() + ) + +@sensor( + job=week_3_pipeline_docker, + minimum_interval_seconds=30 +) +def week_3_sensor_docker(context): + new_files = get_s3_keys( + bucket="dagster", + prefix="prefix", + endpoint_url="http://localstack:4566" + ) + sensor_docker_config = docker.copy() + if not new_files: + yield SkipReason("No new s3 files found in bucket.") + return + for new_file in new_files: + sensor_docker_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"{new_file}" + yield RunRequest( + run_key=new_file, + run_config=sensor_docker_config + ) diff --git a/week_3/workspaces/types.py b/week_3/workspaces/types.py index cd1c93b6..4fc3789c 100644 --- a/week_3/workspaces/types.py +++ b/week_3/workspaces/types.py @@ -3,6 +3,11 @@ from dagster import usable_as_dagster_type from pydantic import BaseModel +@usable_as_dagster_type(description="Postgres Record") +class PostgresRecord(BaseModel): + v1: str + v2: str + v3: str @usable_as_dagster_type(description="Stock data") class Stock(BaseModel):