Skip to content

Commit

Permalink
🐛 cache engine to avoid creating too many connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Apr 17, 2024
1 parent 97b9d2b commit d4a574e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 24 deletions.
22 changes: 12 additions & 10 deletions etl/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import warnings
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional
from urllib.parse import quote

import MySQLdb
Expand Down Expand Up @@ -42,17 +43,18 @@ def get_session(**kwargs) -> Session:
return Session(get_engine(**kwargs))


@functools.cache
def _get_engine_cached(cf: Any) -> Engine:
return create_engine(
f"mysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}",
pool_size=30, # Increase the pool size to allow higher GRAPHER_WORKERS
max_overflow=30, # Increase the max overflow limit to allow higher GRAPHER_WORKERS
)


def get_engine(conf: Optional[Dict[str, Any]] = None) -> Engine:
cf: Any = dict_to_object(conf) if conf else config

return cast(
Engine,
create_engine(
f"mysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}",
pool_size=30, # Increase the pool size to allow higher GRAPHER_WORKERS
max_overflow=30, # Increase the max overflow limit to allow higher GRAPHER_WORKERS
),
)
return _get_engine_cached(cf)


def get_dataset_id(
Expand Down
5 changes: 3 additions & 2 deletions etl/grapher_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from apps.backport.datasync.datasync import upload_gzip_dict
from etl import config
from etl.db import get_engine

from . import grapher_helpers as gh
from . import grapher_model as gm
Expand Down Expand Up @@ -333,7 +334,7 @@ def fetch_db_checksum(dataset: catalog.Dataset) -> Optional[str]:
assert dataset.metadata.version, "Dataset must have a version"
assert dataset.metadata.namespace, "Dataset must have a namespace"

with Session(gm.get_engine()) as session:
with Session(get_engine()) as session:
q = select(gm.Dataset).where(
gm.Dataset.shortName == dataset.metadata.short_name,
gm.Dataset.version == dataset.metadata.version,
Expand All @@ -344,7 +345,7 @@ def fetch_db_checksum(dataset: catalog.Dataset) -> Optional[str]:


def set_dataset_checksum_and_editedAt(dataset_id: int, checksum: str) -> None:
with Session(gm.get_engine()) as session:
with Session(get_engine()) as session:
q = (
update(gm.Dataset)
.where(gm.Dataset.id == dataset_id)
Expand Down
10 changes: 1 addition & 9 deletions etl/grapher_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
```
It has been slightly modified since then.
"""
import functools
import json
from datetime import date, datetime
from pathlib import Path
Expand Down Expand Up @@ -35,15 +36,13 @@
VARCHAR,
)
from sqlalchemy.exc import NoResultFound
from sqlalchemy.future import Engine as _FutureEngine
from sqlmodel import JSON as _JSON
from sqlmodel import (
Column,
Field,
Relationship,
Session,
SQLModel,
create_engine,
or_,
select,
)
Expand All @@ -68,13 +67,6 @@
JSON = _JSON(none_as_null=True)


def get_engine() -> _FutureEngine:
return create_engine(
f"mysql://{config.DB_USER}:{quote(config.DB_PASS)}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}",
future=False,
)


t_active_datasets = Table(
"active_datasets",
metadata,
Expand Down
4 changes: 1 addition & 3 deletions etl/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,11 +797,9 @@ def run(self) -> None:

dataset.metadata = gh._adapt_dataset_metadata_for_grapher(dataset.metadata)

engine = get_engine()

assert dataset.metadata.namespace
dataset_upsert_results = gi.upsert_dataset(
engine,
self.engine,
dataset,
dataset.metadata.namespace,
dataset.metadata.sources,
Expand Down

0 comments on commit d4a574e

Please sign in to comment.