Skip to content

Commit

Permalink
increase duckdb memory 500mb to 5gb
Browse files Browse the repository at this point in the history
  • Loading branch information
tekkisse committed Apr 7, 2024
1 parent 05a38a1 commit 28c00f8
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 5 deletions.
Binary file modified dags/__pycache__/ingest_csv_to_iceberg2.cpython-310.pyc
Binary file not shown.
9 changes: 7 additions & 2 deletions dags/ingest_csv_to_iceberg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import boto3
import json
import s3fs
from random import randint
from airflow import DAG
from airflow.operators.python import get_current_context, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
Expand All @@ -31,6 +32,10 @@
validate_s3_key
)

def random_with_N_digits(n):
range_start = 10**(n-1)
range_end = (10**n)-1
return randint(range_start, range_end)

def sha1(value):
sha_1 = hashlib.sha1()
Expand Down Expand Up @@ -198,7 +203,7 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key
iceberg_bucket = dataset
iceberg_dir = validate_s3_key(f"{version}")

iceberg_path = validate_s3_key(f"{iceberg_bucket}/{iceberg_dir}")
iceberg_path = validate_s3_key(f"{iceberg_bucket}/{iceberg_dir}/{tablename}")

iceberg = {
"schema": iceberg_schema,
Expand Down Expand Up @@ -316,7 +321,7 @@ def process_event(message):
version="20",
ingest_bucket=event['bucket'],
ingest_key=event['src_file_path'],
dag_id=event['etag'],
dag_id=event['etag']+str(random_with_N_digits(4)),
ingest_delete=False,
debug=True)

Expand Down
Binary file modified dags/modules/databases/__pycache__/duckdb.cpython-310.pyc
Binary file not shown.
5 changes: 3 additions & 2 deletions dags/modules/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
logger = logging.getLogger(__name__)


def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: str, dst_key: str, memory: int = 500):
def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: str, dst_key: str, memory: int = 5):

assert src_key.lower().endswith(".csv")
assert dst_key.lower().endswith(".parquet")
Expand All @@ -22,6 +22,7 @@ def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: s
endpoint = s3_conn["endpoint_url"]\
.replace("http://", "").replace("https://", "")


con = duckdb.connect(database=':memory:')

query = f"INSTALL '/opt/duckdb/httpfs.duckdb_extension';" \
Expand All @@ -31,7 +32,7 @@ def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: s
f"SET s3_secret_access_key='{secret_access_key}';" \
f"SET s3_use_ssl=False;" \
f"SET s3_url_style='path';" \
f"SET memory_limit='{memory}MB'"
f"SET memory_limit='{memory}GB'"
logger.info(f"query={query}")
con.execute(query)

Expand Down
2 changes: 1 addition & 1 deletion docker-compose/airflow/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ version: '3'
x-airflow-common:
&airflow-common
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
image: harbor.ukserp.ac.uk/dare/airflow/base:1.1.7
image: harbor.ukserp.ac.uk/dare/airflow/base:pr-40 #1.1.7
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
Expand Down

0 comments on commit 28c00f8

Please sign in to comment.