diff --git a/etl/db.py b/etl/db.py index cf07e1edc056..aa0d21da2b53 100644 --- a/etl/db.py +++ b/etl/db.py @@ -1,5 +1,6 @@ +import functools import warnings -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, List, Optional from urllib.parse import quote import MySQLdb @@ -42,17 +43,18 @@ def get_session(**kwargs) -> Session: return Session(get_engine(**kwargs)) +@functools.cache +def _get_engine_cached(cf: Any) -> Engine: + return create_engine( + f"mysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}", + pool_size=30, # Increase the pool size to allow higher GRAPHER_WORKERS + max_overflow=30, # Increase the max overflow limit to allow higher GRAPHER_WORKERS + ) + + def get_engine(conf: Optional[Dict[str, Any]] = None) -> Engine: cf: Any = dict_to_object(conf) if conf else config - - return cast( - Engine, - create_engine( - f"mysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}", - pool_size=30, # Increase the pool size to allow higher GRAPHER_WORKERS - max_overflow=30, # Increase the max overflow limit to allow higher GRAPHER_WORKERS - ), - ) + return _get_engine_cached(cf) def get_dataset_id( diff --git a/etl/grapher_import.py b/etl/grapher_import.py index fe4c4fa82a18..5062b30c6526 100644 --- a/etl/grapher_import.py +++ b/etl/grapher_import.py @@ -30,6 +30,7 @@ ) from apps.backport.datasync.datasync import upload_gzip_dict from etl import config +from etl.db import get_engine from . import grapher_helpers as gh from . import grapher_model as gm @@ -333,7 +334,7 @@ def fetch_db_checksum(dataset: catalog.Dataset) -> Optional[str]: assert dataset.metadata.version, "Dataset must have a version" assert dataset.metadata.namespace, "Dataset must have a namespace" - with Session(gm.get_engine()) as session: + with Session(get_engine()) as session: q = select(gm.Dataset).where( gm.Dataset.shortName == dataset.metadata.short_name, gm.Dataset.version == dataset.metadata.version, @@ -344,7 +345,7 @@ def fetch_db_checksum(dataset: catalog.Dataset) -> Optional[str]: def set_dataset_checksum_and_editedAt(dataset_id: int, checksum: str) -> None: - with Session(gm.get_engine()) as session: + with Session(get_engine()) as session: q = ( update(gm.Dataset) .where(gm.Dataset.id == dataset_id) diff --git a/etl/grapher_model.py b/etl/grapher_model.py index c6df22cebbf8..8f74a52d6c0b 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -4,6 +4,7 @@ ``` It has been slightly modified since then. """ +import functools import json from datetime import date, datetime from pathlib import Path @@ -35,7 +36,6 @@ VARCHAR, ) from sqlalchemy.exc import NoResultFound -from sqlalchemy.future import Engine as _FutureEngine from sqlmodel import JSON as _JSON from sqlmodel import ( Column, @@ -43,7 +43,6 @@ Relationship, Session, SQLModel, - create_engine, or_, select, ) @@ -68,13 +67,6 @@ JSON = _JSON(none_as_null=True) -def get_engine() -> _FutureEngine: - return create_engine( - f"mysql://{config.DB_USER}:{quote(config.DB_PASS)}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}", - future=False, - ) - - t_active_datasets = Table( "active_datasets", metadata, diff --git a/etl/steps/__init__.py b/etl/steps/__init__.py index af9ccda8d60c..d44e897b01e1 100644 --- a/etl/steps/__init__.py +++ b/etl/steps/__init__.py @@ -797,11 +797,9 @@ def run(self) -> None: dataset.metadata = gh._adapt_dataset_metadata_for_grapher(dataset.metadata) - engine = get_engine() - assert dataset.metadata.namespace dataset_upsert_results = gi.upsert_dataset( - engine, + self.engine, dataset, dataset.metadata.namespace, dataset.metadata.sources,