Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

week 4 project #93

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitpod.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ RUN pip install "poetry==1.1.12"
COPY poetry.lock pyproject.toml /

RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
&& poetry install --no-interaction --no-ansi
41 changes: 32 additions & 9 deletions week_1/challenge/week_1_challenge.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,37 @@ def csv_helper(file_name: str) -> Iterator[Stock]:
yield Stock.from_list(row)


@op
def get_s3_data():
pass

@op (
config_schema = {
"s3_key": String
},
out = {
"stocks": Out(List[Stock],is_required=False),
"empty_stocks": Out(Any,is_required=False)
}
)
def get_s3_data(context) -> List[Stock]:
s3_key = context.op_config["s3_key"]
iter = csv_helper(s3_key)
if not any(iter):
yield Output(None, "empty_stocks")
else:
yield Output(list(iter), "stocks")

@op
def process_data():
pass

@op(
config_schema = {"nlargest": int},
out = DynamicOut(Aggregation)
)
def process_data(context, stocks: List[Stock])-> Aggregation:
stocks.sort(key= lambda stock: stock.high, reverse= True)
n = context.op_config["nlargest"]
for i in range(0,n):
agg = Aggregation(date=stocks[n].date, high=stocks[n].high)
yield DynamicOutput(agg, mapping_key=str(i))

@op
def put_redis_data():
def put_redis_data(context, aggs: Aggregation) -> Nothing:
pass


Expand All @@ -80,4 +99,8 @@ def empty_stock_notify(context, empty_stocks) -> Nothing:

@job
def week_1_challenge():
pass
stocks, no_stock = get_s3_data()
processed = process_data(stocks)
empty_stock_notify(no_stock)
processed.map(put_redis_data)

32 changes: 21 additions & 11 deletions week_1/project/week_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,31 @@ def csv_helper(file_name: str) -> Iterator[Stock]:
yield Stock.from_list(row)


@op
def get_s3_data():
pass


@op
def process_data():
pass

@op (
config_schema= {
"s3_key": String
}
)
def get_s3_data(context) -> List[Stock]:
s3_key = context.op_config["s3_key"]
return list(csv_helper(s3_key))


@op (
ins={"stocks": In(dagster_type=List[Stock])},
out={"agg": Out(dagster_type=Aggregation)}
)
def process_data(context, stocks: List[Stock]) -> Aggregation:
highest_stock = max(stocks, key= lambda stock: stock.high)
return Aggregation(date=highest_stock.date,high=highest_stock.high)

@op
def put_redis_data():
def put_redis_data(context, aggs: Aggregation) -> Nothing:
pass


@job
def week_1_pipeline():
pass
s3_data = get_s3_data()
processed = process_data(s3_data)
put_redis_data(processed)
56 changes: 49 additions & 7 deletions week_2/workspaces/challenge/week_2_challenge.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from random import randint

from dagster import In, Nothing, String, graph, op
from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op
from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op, DbtOutput
from workspaces.resources import postgres_resource

from dagster import (
In,
Out,
Nothing,
String,
Any,
graph,
op,
HookContext,
failure_hook,
success_hook
)

DBT_PROJECT_PATH = "/opt/dagster/dagster_home/dbt_test_project/."


Expand All @@ -21,27 +33,52 @@ def create_dbt_table(context) -> String:
context.resources.database.execute_query(sql)
return table_name


@op(
ins={"table_name": In(dagster_type=String)},
required_resource_keys={"database"},
tags={"kind": "postgres"},
)
def insert_dbt_data(context, table_name):
def insert_dbt_data(context, table_name)-> Nothing:
sql = f"INSERT INTO {table_name} (column_1, column_2, column_3) VALUES ('A', 'B', 'C');"

number_of_rows = randint(1, 100)
for _ in range(number_of_rows):
context.resources.database.execute_query(sql)
context.log.info("Inserted a row")

context.log.info("Batch inserted")


@op(
required_resource_keys={"dbt"},
tags={"kind": "dbt"}
)
def dbt_run() -> DbtOutput:
dbt_run_op()


@op (
required_resource_keys={"dbt"},
tags={"kind": "dbt"}
)
def dbt_test() -> DbtOutput:
dbt_test_op()


@success_hook
def notify_success(context: HookContext) -> Nothing:
message = f"Op {context.op.name} finished successfully"
context.log.info(message)


@failure_hook
def notify_failure(context: HookContext) -> Nothing:
message = f"Op {context.op.name} failed"
context.log.info(message)

@graph
def week_2_challenge():
pass

tbl = create_dbt_table()
dbt_test_op.with_hooks({notify_success, notify_failure})(dbt_run_op(insert_dbt_data(tbl)))

docker = {
"resources": {
Expand All @@ -68,4 +105,9 @@ def week_2_challenge():

week_2_challenge_docker = week_2_challenge.to_job(
name="week_2_challenge_docker",
config = docker,
resource_defs = {
"database": postgres_resource,
"dbt": dbt_cli_resource
}
)
53 changes: 40 additions & 13 deletions week_2/workspaces/project/week_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,46 @@
from workspaces.types import Aggregation, Stock


@op
def get_s3_data():
pass
@op(
config_schema={
"s3_key": String
},
required_resource_keys={"s3"}
)
def get_s3_data(context) -> List[Stock]:
s3_key = context.op_config["s3_key"]
return [Stock.from_list(item) for item in context.resources.s3.get_data(s3_key)]


@op
def process_data():
pass
@op (
ins={"stocks": In(dagster_type=List[Stock])},
out={"agg": Out(dagster_type=Aggregation)}
)
def process_data(context, stocks: List[Stock]) -> Aggregation:
highest_stock = max(stocks, key= lambda stock: stock.high)
return Aggregation(date=highest_stock.date,high=highest_stock.high)


@op
def put_redis_data():
pass
@op(
required_resource_keys={"redis"}
)
def put_redis_data(context, agg: Aggregation) -> Nothing:
context.resources.redis.put_data(name=str(agg.date), value=str(agg.high))


@op
def put_s3_data():
pass
@op(
required_resource_keys={"s3"}
)
def put_s3_data(context, agg: Aggregation) -> Nothing:
context.resources.s3.put_data(key=agg.date, data=agg)


@graph
def week_2_pipeline():
pass
stocks = get_s3_data()
processed = process_data(stocks)
put_redis_data(processed)
put_s3_data(processed)


local = {
Expand Down Expand Up @@ -56,8 +73,18 @@ def week_2_pipeline():

week_2_pipeline_local = week_2_pipeline.to_job(
name="week_2_pipeline_local",
config= local,
resource_defs = {
"s3": mock_s3_resource,
"redis": ResourceDefinition.mock_resource()
}
)

week_2_pipeline_docker = week_2_pipeline.to_job(
name="week_2_pipeline_docker",
config= docker,
resource_defs = {
"s3": s3_resource,
"redis": redis_resource
}
)
34 changes: 28 additions & 6 deletions week_2/workspaces/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,35 @@ def mock_s3_resource(context):
return s3_mock


@resource
def s3_resource():
@resource(
config_schema={
"bucket": Field(String),
"access_key": Field(String),
"secret_key": Field(String),
"endpoint_url": Field(String)
},
description="A resource that runs S3."
)
def s3_resource(context) -> S3:
"""This resource defines a S3 client"""
pass
return S3(
bucket=context.resource_config["bucket"],
access_key=context.resource_config["access_key"],
secret_key=context.resource_config["secret_key"],
endpoint_url=context.resource_config["endpoint_url"]
)


@resource
def redis_resource():
@resource(
config_schema={
"host": Field(String),
"port": Field(Int)
},
description="A resource that runs Redis."
)
def redis_resource(context) -> Redis:
"""This resource defines a Redis client"""
pass
return Redis(
host=context.resource_config["host"],
port=context.resource_config["port"]
)
Loading