Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

week 3 project #90

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cf83146
first commit.
frankcholula Nov 23, 2022
b2dec37
added input and output.
frankcholula Nov 23, 2022
b0fc875
finihed challenge.
frankcholula Nov 23, 2022
fa965c8
fixed type hints.
frankcholula Nov 23, 2022
72502ae
fix type hint.
frankcholula Nov 23, 2022
f552435
finished week2 resources.
frankcholula Nov 28, 2022
d077dd2
implemented challenge need to ask OH proper success failure implement…
frankcholula Nov 29, 2022
17f2929
modify dockerfile for dbt run to finish.
frankcholula Dec 2, 2022
5e272d3
get challenge working with success and failure hooks.
frankcholula Dec 2, 2022
7989c59
added tags.
frankcholula Dec 2, 2022
fb79a50
removed unused type.
frankcholula Dec 2, 2022
1a678e2
fixed outs in dbt test ops.
frankcholula Dec 2, 2022
245448b
remove legacy log file.
frankcholula Dec 2, 2022
3f980fd
recover dagster user.
frankcholula Dec 2, 2022
f652265
rerecover dagster user.
frankcholula Dec 2, 2022
01c3861
remove committed user yml file.
frankcholula Dec 2, 2022
0207361
Merge branch 'dehume:master' into master
frankcholula Dec 2, 2022
a5dd786
need to fix sensor eval context.
frankcholula Dec 5, 2022
5dfd4a5
one more docker validation test.
frankcholula Dec 5, 2022
ba6679f
finished week3. still need to do challenge.
frankcholula Dec 7, 2022
bb5b9f0
remove unused imports.
frankcholula Dec 7, 2022
f7b1af2
change endpoint url.
frankcholula Dec 8, 2022
758da6d
fixed typo.
frankcholula Dec 8, 2022
17daf5a
typo fix.
frankcholula Dec 8, 2022
1dca73b
attempt at custom io manager.
frankcholula Dec 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitpod.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ RUN pip install "poetry==1.1.12"
COPY poetry.lock pyproject.toml /

RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
&& poetry install --no-interaction --no-ansi
41 changes: 32 additions & 9 deletions week_1/challenge/week_1_challenge.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,37 @@ def csv_helper(file_name: str) -> Iterator[Stock]:
yield Stock.from_list(row)


@op
def get_s3_data():
pass

@op (
config_schema = {
"s3_key": String
},
out = {
"stocks": Out(List[Stock],is_required=False),
"empty_stocks": Out(Any,is_required=False)
}
)
def get_s3_data(context) -> List[Stock]:
s3_key = context.op_config["s3_key"]
iter = csv_helper(s3_key)
if not any(iter):
yield Output(None, "empty_stocks")
else:
yield Output(list(iter), "stocks")

@op
def process_data():
pass

@op(
config_schema = {"nlargest": int},
out = DynamicOut(Aggregation)
)
def process_data(context, stocks: List[Stock])-> Aggregation:
stocks.sort(key= lambda stock: stock.high, reverse= True)
n = context.op_config["nlargest"]
for i in range(0,n):
agg = Aggregation(date=stocks[n].date, high=stocks[n].high)
yield DynamicOutput(agg, mapping_key=str(i))

@op
def put_redis_data():
def put_redis_data(context, aggs: Aggregation) -> Nothing:
pass


Expand All @@ -80,4 +99,8 @@ def empty_stock_notify(context, empty_stocks) -> Nothing:

@job
def week_1_challenge():
pass
stocks, no_stock = get_s3_data()
processed = process_data(stocks)
empty_stock_notify(no_stock)
processed.map(put_redis_data)

32 changes: 21 additions & 11 deletions week_1/project/week_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,31 @@ def csv_helper(file_name: str) -> Iterator[Stock]:
yield Stock.from_list(row)


@op
def get_s3_data():
pass


@op
def process_data():
pass

@op (
config_schema= {
"s3_key": String
}
)
def get_s3_data(context) -> List[Stock]:
s3_key = context.op_config["s3_key"]
return list(csv_helper(s3_key))


@op (
ins={"stocks": In(dagster_type=List[Stock])},
out={"agg": Out(dagster_type=Aggregation)}
)
def process_data(context, stocks: List[Stock]) -> Aggregation:
highest_stock = max(stocks, key= lambda stock: stock.high)
return Aggregation(date=highest_stock.date,high=highest_stock.high)

@op
def put_redis_data():
def put_redis_data(context, aggs: Aggregation) -> Nothing:
pass


@job
def week_1_pipeline():
pass
s3_data = get_s3_data()
processed = process_data(s3_data)
put_redis_data(processed)
56 changes: 49 additions & 7 deletions week_2/workspaces/challenge/week_2_challenge.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from random import randint

from dagster import In, Nothing, String, graph, op
from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op
from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op, DbtOutput
from workspaces.resources import postgres_resource

from dagster import (
In,
Out,
Nothing,
String,
Any,
graph,
op,
HookContext,
failure_hook,
success_hook
)

DBT_PROJECT_PATH = "/opt/dagster/dagster_home/dbt_test_project/."


Expand All @@ -21,27 +33,52 @@ def create_dbt_table(context) -> String:
context.resources.database.execute_query(sql)
return table_name


@op(
ins={"table_name": In(dagster_type=String)},
required_resource_keys={"database"},
tags={"kind": "postgres"},
)
def insert_dbt_data(context, table_name):
def insert_dbt_data(context, table_name)-> Nothing:
sql = f"INSERT INTO {table_name} (column_1, column_2, column_3) VALUES ('A', 'B', 'C');"

number_of_rows = randint(1, 100)
for _ in range(number_of_rows):
context.resources.database.execute_query(sql)
context.log.info("Inserted a row")

context.log.info("Batch inserted")


@op(
required_resource_keys={"dbt"},
tags={"kind": "dbt"}
)
def dbt_run() -> DbtOutput:
dbt_run_op()


@op (
required_resource_keys={"dbt"},
tags={"kind": "dbt"}
)
def dbt_test() -> DbtOutput:
dbt_test_op()


@success_hook
def notify_success(context: HookContext) -> Nothing:
message = f"Op {context.op.name} finished successfully"
context.log.info(message)


@failure_hook
def notify_failure(context: HookContext) -> Nothing:
message = f"Op {context.op.name} failed"
context.log.info(message)

@graph
def week_2_challenge():
pass

tbl = create_dbt_table()
dbt_test_op.with_hooks({notify_success, notify_failure})(dbt_run_op(insert_dbt_data(tbl)))

docker = {
"resources": {
Expand All @@ -68,4 +105,9 @@ def week_2_challenge():

week_2_challenge_docker = week_2_challenge.to_job(
name="week_2_challenge_docker",
config = docker,
resource_defs = {
"database": postgres_resource,
"dbt": dbt_cli_resource
}
)
53 changes: 40 additions & 13 deletions week_2/workspaces/project/week_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,46 @@
from workspaces.types import Aggregation, Stock


@op
def get_s3_data():
pass
@op(
config_schema={
"s3_key": String
},
required_resource_keys={"s3"}
)
def get_s3_data(context) -> List[Stock]:
s3_key = context.op_config["s3_key"]
return [Stock.from_list(item) for item in context.resources.s3.get_data(s3_key)]


@op
def process_data():
pass
@op (
ins={"stocks": In(dagster_type=List[Stock])},
out={"agg": Out(dagster_type=Aggregation)}
)
def process_data(context, stocks: List[Stock]) -> Aggregation:
highest_stock = max(stocks, key= lambda stock: stock.high)
return Aggregation(date=highest_stock.date,high=highest_stock.high)


@op
def put_redis_data():
pass
@op(
required_resource_keys={"redis"}
)
def put_redis_data(context, agg: Aggregation) -> Nothing:
context.resources.redis.put_data(name=str(agg.date), value=str(agg.high))


@op
def put_s3_data():
pass
@op(
required_resource_keys={"s3"}
)
def put_s3_data(context, agg: Aggregation) -> Nothing:
context.resources.s3.put_data(key=agg.date, data=agg)


@graph
def week_2_pipeline():
pass
stocks = get_s3_data()
processed = process_data(stocks)
put_redis_data(processed)
put_s3_data(processed)


local = {
Expand Down Expand Up @@ -56,8 +73,18 @@ def week_2_pipeline():

week_2_pipeline_local = week_2_pipeline.to_job(
name="week_2_pipeline_local",
config= local,
resource_defs = {
"s3": mock_s3_resource,
"redis": ResourceDefinition.mock_resource()
}
)

week_2_pipeline_docker = week_2_pipeline.to_job(
name="week_2_pipeline_docker",
config= docker,
resource_defs = {
"s3": s3_resource,
"redis": redis_resource
}
)
34 changes: 28 additions & 6 deletions week_2/workspaces/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,35 @@ def mock_s3_resource(context):
return s3_mock


@resource
def s3_resource():
@resource(
config_schema={
"bucket": Field(String),
"access_key": Field(String),
"secret_key": Field(String),
"endpoint_url": Field(String)
},
description="A resource that runs S3."
)
def s3_resource(context) -> S3:
"""This resource defines a S3 client"""
pass
return S3(
bucket=context.resource_config["bucket"],
access_key=context.resource_config["access_key"],
secret_key=context.resource_config["secret_key"],
endpoint_url=context.resource_config["endpoint_url"]
)


@resource
def redis_resource():
@resource(
config_schema={
"host": Field(String),
"port": Field(Int)
},
description="A resource that runs Redis."
)
def redis_resource(context) -> Redis:
"""This resource defines a Redis client"""
pass
return Redis(
host=context.resource_config["host"],
port=context.resource_config["port"]
)
Loading