Skip to content

Commit

Permalink
fix(ingest): queries not completing (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
JossWhittle authored Apr 3, 2024
1 parent 69448f6 commit 7147436
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 28 deletions.
194 changes: 194 additions & 0 deletions dags/ingest_csv_to_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import os.path
import logging
import pendulum
from airflow import DAG
from airflow.operators.python import get_current_context, task

from modules.databases.duckdb import s3_csv_to_parquet
from modules.utils.s3 import s3_delete
from modules.utils.sql import escape_dataset
from modules.utils.sha1 import sha1
from modules.databases.trino import (
create_schema,
drop_table,
get_trino_conn_details,
get_trino_engine,
hive_create_table_from_parquet,
iceberg_create_table_from_hive,
validate_identifier,
validate_s3_key
)


with DAG(
dag_id="ingest_csv_to_iceberg",
schedule=None,
start_date=pendulum.datetime(1900, 1, 1, tz="UTC"),
catchup=True,
max_active_runs=1,
concurrency=1,
tags=["ingest", "csv", "iceberg", "s3"],
) as dag:

# Makes this logging namespace appear immediately in airflow
logging.info("DAG parsing...")

@task
def ingest_csv_to_iceberg():

########################################################################
logging.info("Starting task ingest_csv_to_iceberg...")

# Extract the Airflow context object for this run of the DAG
context = get_current_context()
logging.info(f"context={context}")

# Extract the task instance object for handling XCOM variables
ti = context['ti']
logging.info(f"ti={ti}")

# Extract the JSON dict of params for this run of the DAG
conf = context['dag_run'].conf
logging.info(f"conf={conf}")

# Unique hashed name for this run of the DAG
dag_hash = sha1(
f"dag_id={ti.dag_id}/"
f"run_id={ti.run_id}/"
f"task_id={ti.task_id}"
)
logging.info(f"dag_hash={dag_hash}")

dag_id = f"{dag_hash}_{ti.try_number}"
logging.info(f"dag_id={dag_id}")

########################################################################
logging.info("Validate inputs...")

debug = conf.get("debug", False)
logging.info(f"debug={debug}")
assert isinstance(debug, bool)

# Path to the data file within the ingest bucket excluding the bucket name
ingest_key = conf.get("ingest_key", None)
logging.info(f"ingest_key={ingest_key}")
assert (ingest_key is not None) and \
isinstance(ingest_key, str) and \
ingest_key.endswith(".csv")

ingest_path = os.path.dirname(ingest_key)
ingest_file = os.path.basename(ingest_key)
ingest_bucket = "ingest"

ingest_delete = conf.get("ingest_delete", False)
logging.info(f"ingest_delete={ingest_delete}")
assert isinstance(ingest_delete, bool)

# Base name of the dataset to provision, defaults to an escaped version of the path
dataset = escape_dataset(conf.get("dataset", ingest_path))

ingest = {
"bucket": ingest_bucket,
"key": ingest_key,
"path": ingest_path,
"file": ingest_file,
"dataset": dataset,
"delete": ingest_delete,
}
logging.info(f"ingest={ingest}")
ti.xcom_push("ingest", ingest)

hive_schema = "minio.csv"
hive_table = validate_identifier(f"{hive_schema}.{dataset}_{dag_id}")
hive_bucket = "loading"
hive_dir = validate_s3_key(f"ingest/{dataset}/{dag_id}")
hive_file, _ = os.path.splitext(ingest_file)
hive_key = f"{hive_dir}/{hive_file}.parquet"
hive_path = validate_s3_key(f"{hive_bucket}/{hive_dir}")
hive = {
"schema": hive_schema,
"table": hive_table,
"bucket": hive_bucket,
"dir": hive_dir,
"file": hive_file,
"key": hive_key,
"path": hive_path,
}
logging.info(f"hive={hive}")
ti.xcom_push("hive", hive)

iceberg_schema = "iceberg.ingest"
iceberg_table = validate_identifier(f"{iceberg_schema}.{dataset}_{dag_id}")
iceberg_bucket = "working"
iceberg_dir = validate_s3_key(f"ingest/{dataset}/{dag_id}")
iceberg_path = validate_s3_key(f"{iceberg_bucket}/{iceberg_dir}")
iceberg = {
"schema": iceberg_schema,
"table": iceberg_table,
"bucket": iceberg_bucket,
"dir": iceberg_dir,
"path": iceberg_path,
}
logging.info(f"iceberg={iceberg}")
ti.xcom_push("iceberg", iceberg)

########################################################################
logging.info("Convert from ingest bucket CSV to loading bucket Parquet using DuckDB...")
s3_csv_to_parquet(
conn_id="s3_conn",
src_bucket=ingest_bucket,
src_key=ingest_key,
dst_bucket=hive_bucket,
dst_key=hive_key
)

if ingest_delete:
s3_delete(conn_id="s3_conn", bucket=ingest_bucket, key=ingest_key)

########################################################################
logging.info("Mounting CSV on s3 into Hive connector and copy to Iceberg...")

# Create a connection to Trino
trino_conn = get_trino_conn_details()
trino = get_trino_engine(trino_conn)

logging.info("Create schema in Hive connector...")
create_schema(trino, schema=hive_schema, location=hive_bucket)

logging.info("Create schema in Iceberg connector...")
create_schema(trino, schema=iceberg_schema, location=iceberg_bucket)

try:
logging.info("Create table in Hive connector...")
hive_create_table_from_parquet(
trino,
table=hive_table,
location=hive_path
)

logging.info("Create table in Iceberg connector...")
iceberg_create_table_from_hive(
trino,
table=iceberg_table,
hive_table=hive_table,
location=iceberg_path
)

finally:
if debug:
logging.info("Debug mode, not cleaning up table in Hive connector...")
else:
logging.info("Cleanup table in Hive connector...")
drop_table(trino, table=hive_table)

logging.info("Cleanup data from Hive connector in s3...")
# External location data is not cascade deleted on drop table
s3_delete(
conn_id="s3_conn",
bucket=hive_bucket,
key=hive_key
)

########################################################################

ingest_csv_to_iceberg()
11 changes: 8 additions & 3 deletions dags/ingest_csv_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ def unpack_minio_event(message):
bucket = records["s3"]["bucket"]["name"]
etag = s3_object["eTag"]

src_file_path: str = s3_object["key"]
src_file_path: str = s3_object["key"].replace('%2F', '/')
assert src_file_path.endswith(".csv")

file_name = src_file_path.replace(".csv", "").replace('%2F', '_')
file_name = src_file_path.replace(".csv", "")
dir_name = src_file_path.split("/")[0]

full_file_path = message_json['Key']
head_path = '/'.join(full_file_path.split('/')[:-1])
Expand All @@ -52,6 +53,7 @@ def unpack_minio_event(message):
src_file_path=src_file_path,
etag=etag,
file_name=file_name,
dir_name=dir_name,
full_file_path=full_file_path,
head_path=head_path
)
Expand All @@ -73,6 +75,9 @@ def sha1(value):
tags=["ingest", "csv", "parquet", "s3"],
) as dag:

# Makes this logging namespace appear immediately in airflow
logger.info("DAG parsing...")

def process_event(message):
logger.info("Processing message!")
logger.info(f"message={message}")
Expand Down Expand Up @@ -125,7 +130,7 @@ def process_event(message):
)
logger.info(f"hive table schema={hive_schema_str}")

table_name = re.sub(r"[^a-zA-Z0-9]", '_', event['file_name']).strip().strip('_').strip()
table_name = re.sub(r"[^a-zA-Z0-9]", '_', event['dir_name']).strip().strip('_').strip()
logger.info(f"table name={table_name}")

hive_table_name = f"{table_name}_{dag_hash}_{ti.try_number}"
Expand Down
42 changes: 42 additions & 0 deletions dags/modules/databases/duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import json
import logging
import duckdb
from airflow.hooks.base import BaseHook

from ..utils.s3 import validate_s3_key

logger = logging.getLogger(__name__)


def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: str, dst_key: str, memory: int = 500):

assert src_key.lower().endswith(".csv")
assert dst_key.lower().endswith(".parquet")
assert validate_s3_key(os.path.dirname(src_key))
assert validate_s3_key(os.path.dirname(dst_key))

s3_conn = json.loads(BaseHook.get_connection(conn_id).get_extra())
access_key_id = s3_conn['aws_access_key_id']
secret_access_key = s3_conn['aws_secret_access_key']
endpoint = s3_conn["endpoint_url"]\
.replace("http://", "").replace("https://", "")

con = duckdb.connect(database=':memory:')

query = f"INSTALL '/opt/duckdb/httpfs.duckdb_extension';" \
f"LOAD httpfs;" \
f"SET s3_endpoint='{endpoint}';" \
f"SET s3_access_key_id='{access_key_id}';" \
f"SET s3_secret_access_key='{secret_access_key}';" \
f"SET s3_use_ssl=False;" \
f"SET s3_url_style='path';" \
f"SET memory_limit='{memory}MB'"
logger.info(f"query={query}")
con.execute(query)

query = f"COPY (SELECT * FROM 's3://{src_bucket}/{src_key}')" \
f"TO 's3://{dst_bucket}/{dst_key}'" \
f"(FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);"
logger.info(f"query={query}")
con.execute(query)
Loading

0 comments on commit 7147436

Please sign in to comment.