From cf8314603a2efb6094f439429b0e0e68fef428f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 23 Nov 2022 07:44:26 +0000 Subject: [PATCH 01/24] first commit. --- week_1/project/week_1.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 095b93af..c8b96661 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -41,21 +41,28 @@ def csv_helper(file_name: str) -> Iterator[Stock]: yield Stock.from_list(row) -@op -def get_s3_data(): - pass +@op ( + config_schema= { + "s3_key": String + } +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + return list(csv_helper(s3_key)) @op -def process_data(): - pass - +def process_data(context, stocks: List[Stock]) -> Aggregation: + highest_stock = max(stocks, key= lambda stock: stock.high) + return Aggregation(date=highest_stock.date,high=highest_stock.high) @op -def put_redis_data(): +def put_redis_data(context, aggs: Aggregation): pass @job def week_1_pipeline(): - pass + s3_data = get_s3_data() + processed = process_data(s3_data) + put_redis_data(processed) From b2dec379f6ce50a8bdf780e4bcbe8b8533aeaa55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 23 Nov 2022 07:53:07 +0000 Subject: [PATCH 02/24] added input and output. --- week_1/project/week_1.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index c8b96661..7b782a18 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -51,7 +51,10 @@ def get_s3_data(context) -> List[Stock]: return list(csv_helper(s3_key)) -@op +@op ( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg": Out(dagster_type=Aggregation)} +) def process_data(context, stocks: List[Stock]) -> Aggregation: highest_stock = max(stocks, key= lambda stock: stock.high) return Aggregation(date=highest_stock.date,high=highest_stock.high) From b0fc8757c27a8bc14f2d0451ff815354a09dc94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 23 Nov 2022 08:55:22 +0000 Subject: [PATCH 03/24] finihed challenge. --- week_1/challenge/week_1_challenge.py | 41 ++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/week_1/challenge/week_1_challenge.py b/week_1/challenge/week_1_challenge.py index 0c84d98d..5b71046c 100644 --- a/week_1/challenge/week_1_challenge.py +++ b/week_1/challenge/week_1_challenge.py @@ -55,18 +55,37 @@ def csv_helper(file_name: str) -> Iterator[Stock]: yield Stock.from_list(row) -@op -def get_s3_data(): - pass - +@op ( + config_schema = { + "s3_key": String + }, + out = { + "stocks": Out(is_required=False), + "empty_stocks": Out(is_required=False) + } +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + iter = csv_helper(s3_key) + if not any(iter): + yield Output(None, "empty_stocks") + else: + yield Output(list(iter), "stocks") -@op -def process_data(): - pass +@op( + config_schema = {"nlargest": int}, + out = DynamicOut() +) +def process_data(context, stocks: List[Stock])-> Aggregation: + stocks.sort(key= lambda stock: stock.high, reverse= True) + n = context.op_config["nlargest"] + for i in range(0,n): + agg = Aggregation(date=stocks[n].date, high=stocks[n].high) + yield DynamicOutput(agg, mapping_key=str(i)) @op -def put_redis_data(): +def put_redis_data(context, aggs: Aggregation): pass @@ -80,4 +99,8 @@ def empty_stock_notify(context, empty_stocks) -> Nothing: @job def week_1_challenge(): - pass + stocks, no_stock = get_s3_data() + processed = process_data(stocks) + empty_stock_notify(no_stock) + processed.map(put_redis_data) + From fa965c82ba128646d32acd5509b938aadcab1838 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 23 Nov 2022 09:18:27 +0000 Subject: [PATCH 04/24] fixed type hints. --- week_1/challenge/week_1_challenge.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/week_1/challenge/week_1_challenge.py b/week_1/challenge/week_1_challenge.py index 5b71046c..7d97fdd6 100644 --- a/week_1/challenge/week_1_challenge.py +++ b/week_1/challenge/week_1_challenge.py @@ -60,8 +60,8 @@ def csv_helper(file_name: str) -> Iterator[Stock]: "s3_key": String }, out = { - "stocks": Out(is_required=False), - "empty_stocks": Out(is_required=False) + "stocks": Out(List[Stock],is_required=False), + "empty_stocks": Out(Any,is_required=False) } ) def get_s3_data(context) -> List[Stock]: @@ -75,7 +75,7 @@ def get_s3_data(context) -> List[Stock]: @op( config_schema = {"nlargest": int}, - out = DynamicOut() + out = DynamicOut(Aggregation) ) def process_data(context, stocks: List[Stock])-> Aggregation: stocks.sort(key= lambda stock: stock.high, reverse= True) @@ -85,7 +85,7 @@ def process_data(context, stocks: List[Stock])-> Aggregation: yield DynamicOutput(agg, mapping_key=str(i)) @op -def put_redis_data(context, aggs: Aggregation): +def put_redis_data(context, aggs: Aggregation) -> Nothing: pass From 72502ae5a6bcfa6be94726d579d319bee6359ef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 23 Nov 2022 09:20:52 +0000 Subject: [PATCH 05/24] fix type hint. --- week_1/project/week_1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 7b782a18..8405ed06 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -60,7 +60,7 @@ def process_data(context, stocks: List[Stock]) -> Aggregation: return Aggregation(date=highest_stock.date,high=highest_stock.high) @op -def put_redis_data(context, aggs: Aggregation): +def put_redis_data(context, aggs: Aggregation) -> Nothing: pass From f5524352037fda46072b56c19a77361c3d527272 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Mon, 28 Nov 2022 07:00:58 +0000 Subject: [PATCH 06/24] finished week2 resources. --- week_2/workspaces/project/week_2.py | 53 ++++++++++++++++++++++------- week_2/workspaces/resources.py | 34 ++++++++++++++---- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/week_2/workspaces/project/week_2.py b/week_2/workspaces/project/week_2.py index 8b184294..da88f1e4 100644 --- a/week_2/workspaces/project/week_2.py +++ b/week_2/workspaces/project/week_2.py @@ -5,29 +5,46 @@ from workspaces.types import Aggregation, Stock -@op -def get_s3_data(): - pass +@op( + config_schema={ + "s3_key": String + }, + required_resource_keys={"s3"} +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + return [Stock.from_list(item) for item in context.resources.s3.get_data(s3_key)] -@op -def process_data(): - pass +@op ( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg": Out(dagster_type=Aggregation)} +) +def process_data(context, stocks: List[Stock]) -> Aggregation: + highest_stock = max(stocks, key= lambda stock: stock.high) + return Aggregation(date=highest_stock.date,high=highest_stock.high) -@op -def put_redis_data(): - pass +@op( + required_resource_keys={"redis"} +) +def put_redis_data(context, agg: Aggregation) -> Nothing: + context.resources.redis.put_data(name=str(agg.date), value=str(agg.high)) -@op -def put_s3_data(): - pass +@op( + required_resource_keys={"s3"} +) +def put_s3_data(context, agg: Aggregation) -> Nothing: + context.resources.s3.put_data(key=agg.date, data=agg) @graph def week_2_pipeline(): - pass + stocks = get_s3_data() + processed = process_data(stocks) + put_redis_data(processed) + put_s3_data(processed) local = { @@ -56,8 +73,18 @@ def week_2_pipeline(): week_2_pipeline_local = week_2_pipeline.to_job( name="week_2_pipeline_local", + config= local, + resource_defs = { + "s3": mock_s3_resource, + "redis": ResourceDefinition.mock_resource() + } ) week_2_pipeline_docker = week_2_pipeline.to_job( name="week_2_pipeline_docker", + config= docker, + resource_defs = { + "s3": s3_resource, + "redis": redis_resource + } ) diff --git a/week_2/workspaces/resources.py b/week_2/workspaces/resources.py index 2c2cba81..dca0945e 100644 --- a/week_2/workspaces/resources.py +++ b/week_2/workspaces/resources.py @@ -98,13 +98,35 @@ def mock_s3_resource(context): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String) + }, + description="A resource that runs S3." +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"] + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int) + }, + description="A resource that runs Redis." +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"] + ) From d077dd2a8d62414f93e596b5694c27207f5946f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Tue, 29 Nov 2022 07:20:45 +0000 Subject: [PATCH 07/24] implemented challenge need to ask OH proper success failure implementation. --- .../workspaces/challenge/week_2_challenge.py | 74 +++++++++++++++++-- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/week_2/workspaces/challenge/week_2_challenge.py b/week_2/workspaces/challenge/week_2_challenge.py index 10d3f677..99c817c5 100644 --- a/week_2/workspaces/challenge/week_2_challenge.py +++ b/week_2/workspaces/challenge/week_2_challenge.py @@ -1,9 +1,22 @@ from random import randint -from dagster import In, Nothing, String, graph, op from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op from workspaces.resources import postgres_resource +from dagster import ( + In, + Out, + Output, + Nothing, + String, + Any, + graph, + op, + HookContext, + failure_hook, + success_hook +) + DBT_PROJECT_PATH = "/opt/dagster/dagster_home/dbt_test_project/." @@ -21,13 +34,12 @@ def create_dbt_table(context) -> String: context.resources.database.execute_query(sql) return table_name - @op( ins={"table_name": In(dagster_type=String)}, required_resource_keys={"database"}, tags={"kind": "postgres"}, ) -def insert_dbt_data(context, table_name): +def insert_dbt_data(context, table_name)-> Nothing: sql = f"INSERT INTO {table_name} (column_1, column_2, column_3) VALUES ('A', 'B', 'C');" number_of_rows = randint(1, 100) @@ -37,11 +49,58 @@ def insert_dbt_data(context, table_name): context.log.info("Batch inserted") +@op( + required_resource_keys={"dbt"} +) +def dbt_run(): + dbt_run_op() + + +@op ( + required_resource_keys={"dbt"}, + out = { + "success": Out(Any, is_required=False), + "failure": Out(Any, is_required=False) + } +) +def dbt_test(): + try: + dbt_test_op() + yield Output(None, "success") + except: + yield Output(None, "failure") + + +@op( + description = "Notify if dbt test succeeded" +) +def dbt_test_success(context, success) -> Nothing: + context.log.info("Success") + + +@op( + description= "Notify if dbt test failed" +) +def dbt_test_failure(context, failure) -> Nothing: + context.log.info("Failure") + +@success_hook +def notify_success(context: HookContext): + message = f"Op {context.op.name} finished successfully" + context.log.info(message) + +@failure_hook +def notify_failure(context: HookContext): + message = f"Op {context.op.name} failed" + context.log.info(message) @graph def week_2_challenge(): - pass - + tbl = create_dbt_table() + # success, failure = dbt_test_op(dbt_run_op(insert_dbt_data(tbl))) + # dbt_test_success(success) + # dbt_test_failure(failure) + dbt_test_op.with_hooks({notify_success, notify_failure})(dbt_run_op(insert_dbt_data(tbl))) docker = { "resources": { @@ -68,4 +127,9 @@ def week_2_challenge(): week_2_challenge_docker = week_2_challenge.to_job( name="week_2_challenge_docker", + config = docker, + resource_defs = { + "database": postgres_resource, + "dbt": dbt_cli_resource + } ) From 17f2929833abdd18f3cc531611e42b95c648c596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 06:11:10 +0000 Subject: [PATCH 08/24] modify dockerfile for dbt run to finish. --- Dockerfile | 1 - dbt_test_project/.user.yml | 1 + dbt_test_project/~/dbt.log.legacy | 4 ++++ 3 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 dbt_test_project/.user.yml create mode 100644 dbt_test_project/~/dbt.log.legacy diff --git a/Dockerfile b/Dockerfile index 109de6ba..0ecf90cb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -88,6 +88,5 @@ FROM runner AS challenge ENV DAGSTER_CURRENT_IMAGE=corise-dagster-answer-key_challenge ARG COURSE_WEEK COPY ${COURSE_WEEK}/workspaces/ ./workspaces -USER dagster:dagster EXPOSE 4002 CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4002", "-f", "workspaces/challenge/repo.py"] diff --git a/dbt_test_project/.user.yml b/dbt_test_project/.user.yml new file mode 100644 index 00000000..ea4c8c00 --- /dev/null +++ b/dbt_test_project/.user.yml @@ -0,0 +1 @@ +id: 868ae72c-700f-4fb3-9b5e-3d5409d7cb87 diff --git a/dbt_test_project/~/dbt.log.legacy b/dbt_test_project/~/dbt.log.legacy new file mode 100644 index 00000000..e81548de --- /dev/null +++ b/dbt_test_project/~/dbt.log.legacy @@ -0,0 +1,4 @@ +{"timestamp": "2022-12-02T06:10:08.548607Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} +{"timestamp": "2022-12-02T06:10:08.551523Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} +{"timestamp": "2022-12-02T06:10:08.554175Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} +{"timestamp": "2022-12-02T06:10:08.556976Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} From 5e272d3de8f6e7fc7b6f3965ba99716804ff6739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:10:02 +0000 Subject: [PATCH 09/24] get challenge working with success and failure hooks. --- .gitpod.Dockerfile | 2 +- .../workspaces/challenge/week_2_challenge.py | 31 ++++--------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/.gitpod.Dockerfile b/.gitpod.Dockerfile index 45da7ce8..68406718 100644 --- a/.gitpod.Dockerfile +++ b/.gitpod.Dockerfile @@ -7,4 +7,4 @@ RUN pip install "poetry==1.1.12" COPY poetry.lock pyproject.toml / RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi \ No newline at end of file + && poetry install --no-interaction --no-ansi diff --git a/week_2/workspaces/challenge/week_2_challenge.py b/week_2/workspaces/challenge/week_2_challenge.py index 99c817c5..5557e9aa 100644 --- a/week_2/workspaces/challenge/week_2_challenge.py +++ b/week_2/workspaces/challenge/week_2_challenge.py @@ -1,6 +1,6 @@ from random import randint -from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op +from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op, DbtOutput from workspaces.resources import postgres_resource from dagster import ( @@ -46,13 +46,13 @@ def insert_dbt_data(context, table_name)-> Nothing: for _ in range(number_of_rows): context.resources.database.execute_query(sql) context.log.info("Inserted a row") - context.log.info("Batch inserted") + @op( required_resource_keys={"dbt"} ) -def dbt_run(): +def dbt_run() -> DbtOutput: dbt_run_op() @@ -63,32 +63,16 @@ def dbt_run(): "failure": Out(Any, is_required=False) } ) -def dbt_test(): - try: - dbt_test_op() - yield Output(None, "success") - except: - yield Output(None, "failure") - - -@op( - description = "Notify if dbt test succeeded" -) -def dbt_test_success(context, success) -> Nothing: - context.log.info("Success") +def dbt_test() -> DbtOutput: + dbt_test_op() -@op( - description= "Notify if dbt test failed" -) -def dbt_test_failure(context, failure) -> Nothing: - context.log.info("Failure") - @success_hook def notify_success(context: HookContext): message = f"Op {context.op.name} finished successfully" context.log.info(message) + @failure_hook def notify_failure(context: HookContext): message = f"Op {context.op.name} failed" @@ -97,9 +81,6 @@ def notify_failure(context: HookContext): @graph def week_2_challenge(): tbl = create_dbt_table() - # success, failure = dbt_test_op(dbt_run_op(insert_dbt_data(tbl))) - # dbt_test_success(success) - # dbt_test_failure(failure) dbt_test_op.with_hooks({notify_success, notify_failure})(dbt_run_op(insert_dbt_data(tbl))) docker = { From 7989c5925c1c81d8576a9b495ec71452c4904c57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:11:11 +0000 Subject: [PATCH 10/24] added tags. --- week_2/workspaces/challenge/week_2_challenge.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/week_2/workspaces/challenge/week_2_challenge.py b/week_2/workspaces/challenge/week_2_challenge.py index 5557e9aa..34368aff 100644 --- a/week_2/workspaces/challenge/week_2_challenge.py +++ b/week_2/workspaces/challenge/week_2_challenge.py @@ -50,7 +50,8 @@ def insert_dbt_data(context, table_name)-> Nothing: @op( - required_resource_keys={"dbt"} + required_resource_keys={"dbt"}, + tags={"kind": "dbt"} ) def dbt_run() -> DbtOutput: dbt_run_op() @@ -61,7 +62,9 @@ def dbt_run() -> DbtOutput: out = { "success": Out(Any, is_required=False), "failure": Out(Any, is_required=False) - } + }, + tags={"kind": "dbt"} + ) def dbt_test() -> DbtOutput: dbt_test_op() From fb79a5057ee9cb5d846031765eba272622944144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:14:44 +0000 Subject: [PATCH 11/24] removed unused type. --- week_2/workspaces/challenge/week_2_challenge.py | 1 - 1 file changed, 1 deletion(-) diff --git a/week_2/workspaces/challenge/week_2_challenge.py b/week_2/workspaces/challenge/week_2_challenge.py index 34368aff..d1d62c97 100644 --- a/week_2/workspaces/challenge/week_2_challenge.py +++ b/week_2/workspaces/challenge/week_2_challenge.py @@ -6,7 +6,6 @@ from dagster import ( In, Out, - Output, Nothing, String, Any, From 1a678e27cb51b6e92a7119771be249c1b2a0005f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:17:25 +0000 Subject: [PATCH 12/24] fixed outs in dbt test ops. --- week_2/workspaces/challenge/week_2_challenge.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/week_2/workspaces/challenge/week_2_challenge.py b/week_2/workspaces/challenge/week_2_challenge.py index d1d62c97..42bc69ae 100644 --- a/week_2/workspaces/challenge/week_2_challenge.py +++ b/week_2/workspaces/challenge/week_2_challenge.py @@ -58,25 +58,20 @@ def dbt_run() -> DbtOutput: @op ( required_resource_keys={"dbt"}, - out = { - "success": Out(Any, is_required=False), - "failure": Out(Any, is_required=False) - }, tags={"kind": "dbt"} - ) def dbt_test() -> DbtOutput: dbt_test_op() @success_hook -def notify_success(context: HookContext): +def notify_success(context: HookContext) -> Nothing: message = f"Op {context.op.name} finished successfully" context.log.info(message) @failure_hook -def notify_failure(context: HookContext): +def notify_failure(context: HookContext) -> Nothing: message = f"Op {context.op.name} failed" context.log.info(message) From 245448b0c62496ea5c6dd547b327774b35e1de30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:25:08 +0000 Subject: [PATCH 13/24] remove legacy log file. --- dbt_test_project/~/dbt.log.legacy | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 dbt_test_project/~/dbt.log.legacy diff --git a/dbt_test_project/~/dbt.log.legacy b/dbt_test_project/~/dbt.log.legacy deleted file mode 100644 index e81548de..00000000 --- a/dbt_test_project/~/dbt.log.legacy +++ /dev/null @@ -1,4 +0,0 @@ -{"timestamp": "2022-12-02T06:10:08.548607Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} -{"timestamp": "2022-12-02T06:10:08.551523Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} -{"timestamp": "2022-12-02T06:10:08.554175Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} -{"timestamp": "2022-12-02T06:10:08.556976Z", "message": "'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.", "channel": "DeprecationWarning", "level": 0, "levelname": "NOTSET", "thread_name": "MainThread", "process": 63, "extra": {"from_warnings": true, "run_state": "parsing", "old_level": 10}} From 3f980fd3c0613afe09a9ff5c9bcff292c5ba18f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:26:13 +0000 Subject: [PATCH 14/24] recover dagster user. --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index 0ecf90cb..c5c21f73 100644 --- a/Dockerfile +++ b/Dockerfile @@ -88,5 +88,6 @@ FROM runner AS challenge ENV DAGSTER_CURRENT_IMAGE=corise-dagster-answer-key_challenge ARG COURSE_WEEK COPY ${COURSE_WEEK}/workspaces/ ./workspaces +USER dagster: dagster EXPOSE 4002 CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4002", "-f", "workspaces/challenge/repo.py"] From f652265c72b897c1bcae274d3a427e59274c49a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:26:44 +0000 Subject: [PATCH 15/24] rerecover dagster user. --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index c5c21f73..109de6ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -88,6 +88,6 @@ FROM runner AS challenge ENV DAGSTER_CURRENT_IMAGE=corise-dagster-answer-key_challenge ARG COURSE_WEEK COPY ${COURSE_WEEK}/workspaces/ ./workspaces -USER dagster: dagster +USER dagster:dagster EXPOSE 4002 CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4002", "-f", "workspaces/challenge/repo.py"] From 01c3861a82320948646da7b36888e8b03c523a6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Fri, 2 Dec 2022 07:27:38 +0000 Subject: [PATCH 16/24] remove committed user yml file. --- dbt_test_project/.user.yml | 1 - 1 file changed, 1 deletion(-) delete mode 100644 dbt_test_project/.user.yml diff --git a/dbt_test_project/.user.yml b/dbt_test_project/.user.yml deleted file mode 100644 index ea4c8c00..00000000 --- a/dbt_test_project/.user.yml +++ /dev/null @@ -1 +0,0 @@ -id: 868ae72c-700f-4fb3-9b5e-3d5409d7cb87 From a5dd78610a9825467aa097eb4eed82217ecb4db6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Mon, 5 Dec 2022 05:35:05 +0000 Subject: [PATCH 17/24] need to fix sensor eval context. --- week_3/workspaces/project/week_3.py | 108 ++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 28 deletions(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 68e1f11b..15bb3953 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -4,6 +4,7 @@ In, Nothing, Out, + String, ResourceDefinition, RetryPolicy, RunRequest, @@ -14,40 +15,52 @@ schedule, sensor, static_partitioned_config, + build_schedule_from_partitioned_job ) from workspaces.project.sensors import get_s3_keys from workspaces.resources import mock_s3_resource, redis_resource, s3_resource from workspaces.types import Aggregation, Stock -@op -def get_s3_data(): - # You can reuse the logic from the previous week - pass - +@op( + config_schema={ + "s3_key": String + }, + required_resource_keys={"s3"} +) +def get_s3_data(context) -> List[Stock]: + s3_key = context.op_config["s3_key"] + return [Stock.from_list(item) for item in context.resources.s3.get_data(s3_key)] -@op -def process_data(): - # You can reuse the logic from the previous week - pass +@op ( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg": Out(dagster_type=Aggregation)} +) +def process_data(context, stocks: List[Stock]) -> Aggregation: + highest_stock = max(stocks, key= lambda stock: stock.high) + return Aggregation(date=highest_stock.date,high=highest_stock.high) -@op -def put_redis_data(): - # You can reuse the logic from the previous week - pass +@op( + required_resource_keys={"redis"} +) +def put_redis_data(context, agg: Aggregation) -> Nothing: + context.resources.redis.put_data(name=str(agg.date), value=str(agg.high)) -@op -def put_s3_data(): - # You can reuse the logic from the previous week - pass +@op( + required_resource_keys={"s3"} +) +def put_s3_data(context, agg: Aggregation) -> Nothing: + context.resources.s3.put_data(key=agg.date, data=agg) @graph def week_3_pipeline(): - # You can reuse the logic from the previous week - pass + stocks = get_s3_data() + processed = process_data(stocks) + put_redis_data(processed) + put_s3_data(processed) local = { @@ -75,28 +88,67 @@ def week_3_pipeline(): "ops": {"get_s3_data": {"config": {"s3_key": "prefix/stock_9.csv"}}}, } - -def docker_config(): - pass - +@static_partitioned_config( + partition_keys=[str(i) for i in range(1,11)] +) +def docker_config(partition_key: str): + partitioned_config = docker.copy() + partitioned_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"prefix/stock_{partition_key}.csv" + return partitioned_config week_3_pipeline_local = week_3_pipeline.to_job( name="week_3_pipeline_local", + config= local, + resource_defs = { + "s3": mock_s3_resource, + "redis": ResourceDefinition.mock_resource() + }, ) week_3_pipeline_docker = week_3_pipeline.to_job( name="week_3_pipeline_docker", + config= docker_config, + resource_defs = { + "s3": s3_resource, + "redis": redis_resource + }, + op_retry_policy=RetryPolicy(max_retries=10, delay=1) ) -week_3_schedule_local = None +week_3_schedule_local = ScheduleDefinition( + job=week_3_pipeline_local, + cron_schedule="15 * * * *" +) @schedule def week_3_schedule_docker(): - pass + return build_schedule_from_partitioned_job(week_3_pipeline_docker) -@sensor -def week_3_sensor_docker(): - pass +@sensor( + job=week_3_pipeline_docker, + minimum_interval_seconds=30 +) +def week_3_sensor_docker(context): + new_files = get_s3_keys( + bucket=context.resources.s3.config.bucket + ) + if not new_files: + yield SkipReason("No new s3 files found in bucket") + return + for new_file in new_files: + yield RunRequest( + run_key=new_file, + run_config= { + "ops": { + "get_s3_data": { + "config": { + "s3_key": "prefix/stock_9.csv" + } + } + } + } + ) + From 5dfd4a57d8147ae4e9d62d43c5bd02c0a0c505db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Mon, 5 Dec 2022 08:30:53 +0000 Subject: [PATCH 18/24] one more docker validation test. --- week_3/workspaces/project/week_3.py | 37 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 15bb3953..c27760e3 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -15,7 +15,8 @@ schedule, sensor, static_partitioned_config, - build_schedule_from_partitioned_job + build_schedule_from_partitioned_job, + ScheduleEvaluationContext ) from workspaces.project.sensors import get_s3_keys from workspaces.resources import mock_s3_resource, redis_resource, s3_resource @@ -93,7 +94,7 @@ def week_3_pipeline(): ) def docker_config(partition_key: str): partitioned_config = docker.copy() - partitioned_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"prefix/stock_{partition_key}.csv" + partitioned_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"{partition_key}" return partitioned_config week_3_pipeline_local = week_3_pipeline.to_job( @@ -107,7 +108,7 @@ def docker_config(partition_key: str): week_3_pipeline_docker = week_3_pipeline.to_job( name="week_3_pipeline_docker", - config= docker_config, + config= docker, resource_defs = { "s3": s3_resource, "redis": redis_resource @@ -118,14 +119,19 @@ def docker_config(partition_key: str): week_3_schedule_local = ScheduleDefinition( job=week_3_pipeline_local, - cron_schedule="15 * * * *" + cron_schedule="*/15 * * * *" ) -@schedule -def week_3_schedule_docker(): - return build_schedule_from_partitioned_job(week_3_pipeline_docker) - +@schedule( + job=week_3_pipeline_docker, + cron_schedule="0 * * * *" +) +def week_3_schedule_docker(context: ScheduleEvaluationContext): + return RunRequest( + run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), + run_config=docker_config(new_file) + ) @sensor( job=week_3_pipeline_docker, @@ -133,22 +139,15 @@ def week_3_schedule_docker(): ) def week_3_sensor_docker(context): new_files = get_s3_keys( - bucket=context.resources.s3.config.bucket + bucket="dagster", + prefix="prefix" ) if not new_files: - yield SkipReason("No new s3 files found in bucket") + yield SkipReason("No new s3 files found in bucket.") return for new_file in new_files: yield RunRequest( run_key=new_file, - run_config= { - "ops": { - "get_s3_data": { - "config": { - "s3_key": "prefix/stock_9.csv" - } - } - } - } + run_config=docker_config(new_file) ) From ba6679fba1b0c41127d298d5623636be7e870438 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 7 Dec 2022 08:50:45 +0000 Subject: [PATCH 19/24] finished week3. still need to do challenge. --- week_3/workspaces/project/week_3.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index c27760e3..d506ae64 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -94,7 +94,7 @@ def week_3_pipeline(): ) def docker_config(partition_key: str): partitioned_config = docker.copy() - partitioned_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"{partition_key}" + partitioned_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"prefix/stock_{partition_key}.csv" return partitioned_config week_3_pipeline_local = week_3_pipeline.to_job( @@ -130,7 +130,7 @@ def docker_config(partition_key: str): def week_3_schedule_docker(context: ScheduleEvaluationContext): return RunRequest( run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), - run_config=docker_config(new_file) + run_config=docker_config() ) @sensor( @@ -140,14 +140,16 @@ def week_3_schedule_docker(context: ScheduleEvaluationContext): def week_3_sensor_docker(context): new_files = get_s3_keys( bucket="dagster", - prefix="prefix" + prefix="prefix", + endpoint_url="http://host.docker.internal:4566" ) + sensor_docker_config = docker.copy() if not new_files: yield SkipReason("No new s3 files found in bucket.") return for new_file in new_files: + sensor_docker_config["ops"]["get_s3_data"]["config"]["s3_key"] = f"{new_file}" yield RunRequest( run_key=new_file, - run_config=docker_config(new_file) + run_config=sensor_docker_config ) - From bb5b9f0a0cc269b03e68b45c94a37e7772e5a794 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Wed, 7 Dec 2022 15:59:41 +0000 Subject: [PATCH 20/24] remove unused imports. --- week_3/workspaces/project/week_3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index d506ae64..6607502d 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -15,7 +15,6 @@ schedule, sensor, static_partitioned_config, - build_schedule_from_partitioned_job, ScheduleEvaluationContext ) from workspaces.project.sensors import get_s3_keys From f7b1af2785c301cd7a281d5e11b14b08ec3a4c95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Thu, 8 Dec 2022 06:45:10 +0000 Subject: [PATCH 21/24] change endpoint url. --- week_3/workspaces/project/week_3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 6607502d..25f295a1 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -128,7 +128,7 @@ def docker_config(partition_key: str): ) def week_3_schedule_docker(context: ScheduleEvaluationContext): return RunRequest( - run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), + run_key=context.scheduled_execution_time.strftime("%Y-%m-%d %H:%M:%S"), run_config=docker_config() ) @@ -140,7 +140,7 @@ def week_3_sensor_docker(context): new_files = get_s3_keys( bucket="dagster", prefix="prefix", - endpoint_url="http://host.docker.internal:4566" + endpoint_url="http://localstack:4566" ) sensor_docker_config = docker.copy() if not new_files: From 758da6d3f7229d47cfd5116a426e21319b607beb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Thu, 8 Dec 2022 07:00:39 +0000 Subject: [PATCH 22/24] fixed typo. --- week_3/workspaces/project/week_3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 25f295a1..86d47bd7 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -52,7 +52,7 @@ def put_redis_data(context, agg: Aggregation) -> Nothing: required_resource_keys={"s3"} ) def put_s3_data(context, agg: Aggregation) -> Nothing: - context.resources.s3.put_data(key=agg.date, data=agg) + context.resources.s3.put_data(key_name=agg.date, data=agg) @graph From 17daf5a5831440ce6fdc5406f3d3cb10da3abc3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Thu, 8 Dec 2022 08:32:41 +0000 Subject: [PATCH 23/24] typo fix. --- week_3/workspaces/project/week_3.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 86d47bd7..ea48964e 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -52,8 +52,7 @@ def put_redis_data(context, agg: Aggregation) -> Nothing: required_resource_keys={"s3"} ) def put_s3_data(context, agg: Aggregation) -> Nothing: - context.resources.s3.put_data(key_name=agg.date, data=agg) - + context.resources.s3.put_data(key_name=str(agg.date), data=agg) @graph def week_3_pipeline(): From 1dca73bf4f729ffb6afc0c615caf8e67a459d41f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20L=C3=BC?= Date: Thu, 8 Dec 2022 10:44:07 +0000 Subject: [PATCH 24/24] attempt at custom io manager. --- .../workspaces/challenge/week_3_challenge.py | 84 ++++++++++++++----- week_3/workspaces/project/week_3.py | 2 +- week_3/workspaces/types.py | 5 ++ 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/week_3/workspaces/challenge/week_3_challenge.py b/week_3/workspaces/challenge/week_3_challenge.py index 3be63e5c..35682397 100644 --- a/week_3/workspaces/challenge/week_3_challenge.py +++ b/week_3/workspaces/challenge/week_3_challenge.py @@ -1,27 +1,48 @@ -from dagster import In, IOManager, Nothing, Out, String, graph, io_manager, op +from dagster import ( + In, + IOManager, + Out, + String, + graph, + io_manager, + op, + InputContext, + OutputContext, + InitResourceContext +) from workspaces.resources import postgres_resource - +from workspaces.types import PostgresRecord +import random as rand +import string class PostgresIOManager(IOManager): - def __init__(self): - pass + def __init__(self,context: InitResourceContext): + self.pg = postgres_resource(context) - def handle_output(self): - pass + def handle_output(self, context: OutputContext, obj:PostgresRecord): + table_name = context.config['table_name'] + v1, v2, v3 = obj.v1, obj.v2, obj.v3 + sql = f"INSERT INTO {table_name} VALUES ({v1},{v2},{v3});" + self.pg.execute_query(sql) - def load_input(self, context): - pass + def load_input(self, context: InputContext): + table_name = context.upstream_output.config["table_name"] + sql= f"select count(*) from {table_name};" + return self.pg.exeecute_query(sql) -@io_manager(required_resource_keys={"postgres"}) +@io_manager( + output_config_schema={"table_name": str}, + required_resource_keys={"postgres"} +) def postgres_io_manager(init_context): - return PostgresIOManager() + return PostgresIOManager(init_context) @op( config_schema={"table_name": String}, required_resource_keys={"database"}, - tags={"kind": "postgres"}, + tags={"kind": "postgres"} ) def create_table(context) -> String: table_name = context.op_config["table_name"] @@ -33,20 +54,27 @@ def create_table(context) -> String: return table_name -@op -def insert_data(): - pass +@op( + out= Out(io_manager_key="postgres_io"), + description= "insert 3 random values into the specified table_name (schema.table)." +) +def insert_random_data() -> PostgresRecord: + v1, v2, v3 = [''.join(rand.choices(string.ascii_uppercase + string.digits, k=5)) for i in range(0,3)] + postgres_record = PostgresRecord(v1=v1, v2=v2, v3=v3) + return postgres_record -@op -def table_count(): +@op( + description= "count how many records are in the specified table_name (schema.table)." +) +def table_count(table_name: String) -> int: pass @graph def week_3_challenge(): - pass - + tbl_name = create_table() + insert_random_data(tbl_name) docker = { "resources": { @@ -59,8 +87,24 @@ def week_3_challenge(): } }, }, - "ops": {"create_table": {"config": {"table_name": "analytics.table"}}}, + "ops": { + "create_table":{ + "config": { + "table_name": "analytics.table" + } + }, + "insert_data":{ + "outputs": { + "result": { + "table_name": "analytics.table" + } + } + } + }, + "postgres_io": postgres_io_manager } -week_3_challenge_docker = week_3_challenge.to_job() +week_3_challenge_docker = week_3_challenge.to_job( + resource_defs= docker +) diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index ea48964e..2985e3fc 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -1,5 +1,5 @@ from typing import List - +import random from dagster import ( In, Nothing, diff --git a/week_3/workspaces/types.py b/week_3/workspaces/types.py index cd1c93b6..4fc3789c 100644 --- a/week_3/workspaces/types.py +++ b/week_3/workspaces/types.py @@ -3,6 +3,11 @@ from dagster import usable_as_dagster_type from pydantic import BaseModel +@usable_as_dagster_type(description="Postgres Record") +class PostgresRecord(BaseModel): + v1: str + v2: str + v3: str @usable_as_dagster_type(description="Stock data") class Stock(BaseModel):