Skip to content

Commit

Permalink
🐛 memoize engine per process
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Apr 17, 2024
1 parent 0d91e7c commit 05a63cf
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
12 changes: 11 additions & 1 deletion etl/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,22 @@ def exec_graph_parallel(
# Dictionary to keep track of future tasks
future_to_task: Dict[Future, str] = {}

ready_tasks = []

while topological_sorter.is_active():
# add new tasks
ready_tasks += topological_sorter.get_ready()

# Submit tasks that are ready to the executor
for task in topological_sorter.get_ready():
# NOTE: limit it to `workers`, otherwise it might accept tasks that are not CPU bound
# and overload our DB
for task in ready_tasks[:workers]:
future = executor.submit(func, task, **kwargs)
future_to_task[future] = task

# remove ready tasks
ready_tasks = ready_tasks[workers:]

# Wait for at least one future to complete
done, _ = wait(future_to_task.keys(), return_when=FIRST_COMPLETED)

Expand Down
7 changes: 5 additions & 2 deletions etl/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import os
import warnings
from typing import Any, Dict, List, Optional
from urllib.parse import quote
Expand Down Expand Up @@ -44,7 +45,7 @@ def get_session(**kwargs) -> Session:


@functools.cache
def _get_engine_cached(cf: Any) -> Engine:
def _get_engine_cached(cf: Any, pid: int) -> Engine:
return create_engine(
f"mysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}",
pool_size=30, # Increase the pool size to allow higher GRAPHER_WORKERS
Expand All @@ -54,7 +55,9 @@ def _get_engine_cached(cf: Any) -> Engine:

def get_engine(conf: Optional[Dict[str, Any]] = None) -> Engine:
cf: Any = dict_to_object(conf) if conf else config
return _get_engine_cached(cf)
# pid in memoization makes sure every process gets its own Engine
pid = os.getpid()
return _get_engine_cached(cf, pid)


def get_dataset_id(
Expand Down

0 comments on commit 05a63cf

Please sign in to comment.