Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): queries not completing #6

Merged
merged 33 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5a6254c
feat: add logging
JossWhittle Oct 26, 2023
3f0e7f1
fix: dir name
JossWhittle Oct 26, 2023
03b8f3a
feat: add csv to iceberg dag
JossWhittle Oct 26, 2023
10c75eb
fix: wrong decorator
JossWhittle Oct 26, 2023
daee753
test: git sync
JossWhittle Oct 27, 2023
8b2478c
fix: call task
JossWhittle Oct 27, 2023
4a0ee83
fix: s3 copy conn
JossWhittle Oct 27, 2023
b407e8a
fix: adjust schemas and table names
JossWhittle Oct 27, 2023
b464550
fix: iceberg target dir
JossWhittle Oct 27, 2023
13dc749
fix: adjust control flow
JossWhittle Oct 27, 2023
2e3ca27
test: sleep before iceberg create
JossWhittle Oct 27, 2023
1c9cce2
fix: need to use different unique schema.table names even between dif…
JossWhittle Oct 27, 2023
cd55e9d
style: flake8
JossWhittle Oct 27, 2023
a209207
feat: add s3 delete to clean up external hive table
JossWhittle Oct 27, 2023
2757300
feat: add debug flag
JossWhittle Oct 27, 2023
2d0bd34
fix: split debug from ingest_delete flag
JossWhittle Oct 27, 2023
f9a8c77
feat: add pyarrow schema inference
JossWhittle Oct 27, 2023
339fec6
fix: logic on dtypes
JossWhittle Oct 27, 2023
9470576
style: flake8
JossWhittle Oct 27, 2023
4efd293
feat: add duckdb and httpfs extension to base image
JossWhittle Oct 27, 2023
83a7c4a
stash
JossWhittle Oct 27, 2023
1a6466d
Merge branch 'main' into fix/debug-ingest
JossWhittle Oct 27, 2023
20f2272
Merge remote-tracking branch 'origin/fix/debug-ingest' into fix/debug…
JossWhittle Oct 27, 2023
e1b6ccc
Revert "stash"
JossWhittle Oct 27, 2023
8e523e7
refactor: split into modules
JossWhittle Oct 27, 2023
d728be3
fix: missing import
JossWhittle Oct 27, 2023
c4dd069
feat: use duckdb for csv to parquet conversion
JossWhittle Oct 27, 2023
1834971
fix: adjust plumbing
JossWhittle Oct 27, 2023
88c6c60
style: formatting
JossWhittle Oct 27, 2023
9a24c6d
fix: force install of extension, shouldn't be needed...
JossWhittle Oct 27, 2023
644b7c2
feat: boost all debug messages to info so avoid the airflow log spam
JossWhittle Oct 27, 2023
c5ff964
style: flake8
JossWhittle Oct 27, 2023
a23900d
Merge branch 'main' into fix/debug-ingest
JossWhittle Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions dags/ingest_csv_to_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import os.path
import logging
import pendulum
from airflow import DAG
from airflow.operators.python import get_current_context, task

from modules.databases.duckdb import s3_csv_to_parquet
from modules.utils.s3 import s3_delete
from modules.utils.sql import escape_dataset
from modules.utils.sha1 import sha1
from modules.databases.trino import (
create_schema,
drop_table,
get_trino_conn_details,
get_trino_engine,
hive_create_table_from_parquet,
iceberg_create_table_from_hive,
validate_identifier,
validate_s3_key
)


with DAG(
dag_id="ingest_csv_to_iceberg",
schedule=None,
start_date=pendulum.datetime(1900, 1, 1, tz="UTC"),
catchup=True,
max_active_runs=1,
concurrency=1,
tags=["ingest", "csv", "iceberg", "s3"],
) as dag:

# Makes this logging namespace appear immediately in airflow
logging.info("DAG parsing...")

@task
def ingest_csv_to_iceberg():

########################################################################
logging.info("Starting task ingest_csv_to_iceberg...")

# Extract the Airflow context object for this run of the DAG
context = get_current_context()
logging.info(f"context={context}")

# Extract the task instance object for handling XCOM variables
ti = context['ti']
logging.info(f"ti={ti}")

# Extract the JSON dict of params for this run of the DAG
conf = context['dag_run'].conf
logging.info(f"conf={conf}")

# Unique hashed name for this run of the DAG
dag_hash = sha1(
f"dag_id={ti.dag_id}/"
f"run_id={ti.run_id}/"
f"task_id={ti.task_id}"
)
logging.info(f"dag_hash={dag_hash}")

dag_id = f"{dag_hash}_{ti.try_number}"
logging.info(f"dag_id={dag_id}")

########################################################################
logging.info("Validate inputs...")

debug = conf.get("debug", False)
logging.info(f"debug={debug}")
assert isinstance(debug, bool)

# Path to the data file within the ingest bucket excluding the bucket name
ingest_key = conf.get("ingest_key", None)
logging.info(f"ingest_key={ingest_key}")
assert (ingest_key is not None) and \
isinstance(ingest_key, str) and \
ingest_key.endswith(".csv")

ingest_path = os.path.dirname(ingest_key)
ingest_file = os.path.basename(ingest_key)
ingest_bucket = "ingest"

ingest_delete = conf.get("ingest_delete", False)
logging.info(f"ingest_delete={ingest_delete}")
assert isinstance(ingest_delete, bool)

# Base name of the dataset to provision, defaults to an escaped version of the path
dataset = escape_dataset(conf.get("dataset", ingest_path))

ingest = {
"bucket": ingest_bucket,
"key": ingest_key,
"path": ingest_path,
"file": ingest_file,
"dataset": dataset,
"delete": ingest_delete,
}
logging.info(f"ingest={ingest}")
ti.xcom_push("ingest", ingest)

hive_schema = "minio.csv"
hive_table = validate_identifier(f"{hive_schema}.{dataset}_{dag_id}")
hive_bucket = "loading"
hive_dir = validate_s3_key(f"ingest/{dataset}/{dag_id}")
hive_file, _ = os.path.splitext(ingest_file)
hive_key = f"{hive_dir}/{hive_file}.parquet"
hive_path = validate_s3_key(f"{hive_bucket}/{hive_dir}")
hive = {
"schema": hive_schema,
"table": hive_table,
"bucket": hive_bucket,
"dir": hive_dir,
"file": hive_file,
"key": hive_key,
"path": hive_path,
}
logging.info(f"hive={hive}")
ti.xcom_push("hive", hive)

iceberg_schema = "iceberg.ingest"
iceberg_table = validate_identifier(f"{iceberg_schema}.{dataset}_{dag_id}")
iceberg_bucket = "working"
iceberg_dir = validate_s3_key(f"ingest/{dataset}/{dag_id}")
iceberg_path = validate_s3_key(f"{iceberg_bucket}/{iceberg_dir}")
iceberg = {
"schema": iceberg_schema,
"table": iceberg_table,
"bucket": iceberg_bucket,
"dir": iceberg_dir,
"path": iceberg_path,
}
logging.info(f"iceberg={iceberg}")
ti.xcom_push("iceberg", iceberg)

########################################################################
logging.info("Convert from ingest bucket CSV to loading bucket Parquet using DuckDB...")
s3_csv_to_parquet(
conn_id="s3_conn",
src_bucket=ingest_bucket,
src_key=ingest_key,
dst_bucket=hive_bucket,
dst_key=hive_key
)

if ingest_delete:
s3_delete(conn_id="s3_conn", bucket=ingest_bucket, key=ingest_key)

########################################################################
logging.info("Mounting CSV on s3 into Hive connector and copy to Iceberg...")

# Create a connection to Trino
trino_conn = get_trino_conn_details()
trino = get_trino_engine(trino_conn)

logging.info("Create schema in Hive connector...")
create_schema(trino, schema=hive_schema, location=hive_bucket)

logging.info("Create schema in Iceberg connector...")
create_schema(trino, schema=iceberg_schema, location=iceberg_bucket)

try:
logging.info("Create table in Hive connector...")
hive_create_table_from_parquet(
trino,
table=hive_table,
location=hive_path
)

logging.info("Create table in Iceberg connector...")
iceberg_create_table_from_hive(
trino,
table=iceberg_table,
hive_table=hive_table,
location=iceberg_path
)

finally:
if debug:
logging.info("Debug mode, not cleaning up table in Hive connector...")
else:
logging.info("Cleanup table in Hive connector...")
drop_table(trino, table=hive_table)

logging.info("Cleanup data from Hive connector in s3...")
# External location data is not cascade deleted on drop table
s3_delete(
conn_id="s3_conn",
bucket=hive_bucket,
key=hive_key
)

########################################################################

ingest_csv_to_iceberg()
11 changes: 8 additions & 3 deletions dags/ingest_csv_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ def unpack_minio_event(message):
bucket = records["s3"]["bucket"]["name"]
etag = s3_object["eTag"]

src_file_path: str = s3_object["key"]
src_file_path: str = s3_object["key"].replace('%2F', '/')
assert src_file_path.endswith(".csv")

file_name = src_file_path.replace(".csv", "").replace('%2F', '_')
file_name = src_file_path.replace(".csv", "")
dir_name = src_file_path.split("/")[0]

full_file_path = message_json['Key']
head_path = '/'.join(full_file_path.split('/')[:-1])
Expand All @@ -52,6 +53,7 @@ def unpack_minio_event(message):
src_file_path=src_file_path,
etag=etag,
file_name=file_name,
dir_name=dir_name,
full_file_path=full_file_path,
head_path=head_path
)
Expand All @@ -73,6 +75,9 @@ def sha1(value):
tags=["ingest", "csv", "parquet", "s3"],
) as dag:

# Makes this logging namespace appear immediately in airflow
logger.info("DAG parsing...")

def process_event(message):
logger.info("Processing message!")
logger.info(f"message={message}")
Expand Down Expand Up @@ -125,7 +130,7 @@ def process_event(message):
)
logger.info(f"hive table schema={hive_schema_str}")

table_name = re.sub(r"[^a-zA-Z0-9]", '_', event['file_name']).strip().strip('_').strip()
table_name = re.sub(r"[^a-zA-Z0-9]", '_', event['dir_name']).strip().strip('_').strip()
logger.info(f"table name={table_name}")

hive_table_name = f"{table_name}_{dag_hash}_{ti.try_number}"
Expand Down
42 changes: 42 additions & 0 deletions dags/modules/databases/duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import json
import logging
import duckdb
from airflow.hooks.base import BaseHook

from ..utils.s3 import validate_s3_key

logger = logging.getLogger(__name__)


def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: str, dst_key: str, memory: int = 500):

assert src_key.lower().endswith(".csv")
assert dst_key.lower().endswith(".parquet")
assert validate_s3_key(os.path.dirname(src_key))
assert validate_s3_key(os.path.dirname(dst_key))

s3_conn = json.loads(BaseHook.get_connection(conn_id).get_extra())
access_key_id = s3_conn['aws_access_key_id']
secret_access_key = s3_conn['aws_secret_access_key']
endpoint = s3_conn["endpoint_url"]\
.replace("http://", "").replace("https://", "")

con = duckdb.connect(database=':memory:')

query = f"INSTALL '/opt/duckdb/httpfs.duckdb_extension';" \
f"LOAD httpfs;" \
f"SET s3_endpoint='{endpoint}';" \
f"SET s3_access_key_id='{access_key_id}';" \
f"SET s3_secret_access_key='{secret_access_key}';" \
f"SET s3_use_ssl=False;" \
f"SET s3_url_style='path';" \
f"SET memory_limit='{memory}MB'"
logger.info(f"query={query}")
con.execute(query)

query = f"COPY (SELECT * FROM 's3://{src_bucket}/{src_key}')" \
f"TO 's3://{dst_bucket}/{dst_key}'" \
f"(FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);"
logger.info(f"query={query}")
con.execute(query)
Loading