From 9c767be9ad045744defdd4263ff1d44b74c4ff69 Mon Sep 17 00:00:00 2001 From: Juan Carlos Jose Camacho Date: Mon, 25 Sep 2023 13:33:07 -0600 Subject: [PATCH 1/2] DH-4703 Set table-desription status --- dataherald/api/fastapi.py | 29 ++++++++++++++++++++- dataherald/db_scanner/models/types.py | 5 ++-- dataherald/db_scanner/sqlalchemy.py | 37 ++++++++++++++++++++------- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/dataherald/api/fastapi.py b/dataherald/api/fastapi.py index 21bfb84e..23e519e5 100644 --- a/dataherald/api/fastapi.py +++ b/dataherald/api/fastapi.py @@ -6,6 +6,7 @@ from bson import json_util from fastapi import BackgroundTasks, HTTPException from overrides import override +from sqlalchemy import MetaData, inspect from dataherald.api import API from dataherald.api.types import Query @@ -221,10 +222,36 @@ def list_table_descriptions( self, db_connection_id: str | None = None, table_name: str | None = None ) -> list[TableSchemaDetail]: scanner_repository = DBScannerRepository(self.storage) - return scanner_repository.find_by( + table_descriptions = scanner_repository.find_by( {"db_connection_id": db_connection_id, "table_name": table_name} ) + if db_connection_id: + db_connection_repository = DatabaseConnectionRepository(self.storage) + db_connection = db_connection_repository.find_by_id(db_connection_id) + database = SQLDatabase.get_sql_engine(db_connection) + inspector = inspect(database.engine) + meta = MetaData(bind=database.engine) + MetaData.reflect(meta, views=True) + all_tables = inspector.get_table_names() + inspector.get_view_names() + + for table_description in table_descriptions: + if table_description.table_name not in all_tables: + table_description.status = "DEPRECATED" + else: + all_tables.remove(table_description.table_name) + for table in all_tables: + table_descriptions.append( + TableSchemaDetail( + table_name=table, + status="NOT_SYNCHRONIZED", + db_connection_id=db_connection_id, + columns=[], + ) + ) + + return table_descriptions + @override def add_golden_records( self, golden_records: List[GoldenRecordRequest] diff --git a/dataherald/db_scanner/models/types.py b/dataherald/db_scanner/models/types.py index 21d53259..be711fd8 100644 --- a/dataherald/db_scanner/models/types.py +++ b/dataherald/db_scanner/models/types.py @@ -25,7 +25,8 @@ class TableSchemaDetail(BaseModel): table_name: str description: str | None table_schema: str | None - columns: list[ColumnDetail] + columns: list[ColumnDetail] = [] examples: list = [] last_schema_sync: datetime | None - status: str = "synchrinozed" + status: str = "SYNCHRONIZED" + error_message: str | None diff --git a/dataherald/db_scanner/sqlalchemy.py b/dataherald/db_scanner/sqlalchemy.py index 093b6abd..090cb2a5 100644 --- a/dataherald/db_scanner/sqlalchemy.py +++ b/dataherald/db_scanner/sqlalchemy.py @@ -146,7 +146,7 @@ def scan_single_table( meta=meta, db_engine=db_engine, table=table, rows_number=3 ), last_schema_sync=datetime.now(), - status="syncronized", + status="SYNCHRONIZED", ) repository.save_table_info(object) @@ -171,13 +171,32 @@ def scan( ] if len(tables) == 0: raise ValueError("No table found") - result = [] + + # persist tables to be scanned for table in tables: - obj = self.scan_single_table( - meta=meta, - table=table, - db_engine=db_engine, - db_connection_id=db_connection_id, - repository=repository, + repository.save_table_info( + TableSchemaDetail( + db_connection_id=db_connection_id, + table_name=table, + status="SYNCHRONIZING", + ) ) - result.append(obj) + + for table in tables: + try: + self.scan_single_table( + meta=meta, + table=table, + db_engine=db_engine, + db_connection_id=db_connection_id, + repository=repository, + ) + except Exception as e: + repository.save_table_info( + TableSchemaDetail( + db_connection_id=db_connection_id, + table_name=table, + status="FAILED", + error_message=f"{e}", + ) + ) From 855afed69eb97ccd20881e86e3e033296db0015e Mon Sep 17 00:00:00 2001 From: Juan Carlos Jose Camacho Date: Mon, 25 Sep 2023 14:59:21 -0600 Subject: [PATCH 2/2] DH-4703 Added script to update table_descriptions rows --- dataherald/api/fastapi.py | 6 +++--- dataherald/db_scanner/models/types.py | 11 ++++++++++- dataherald/db_scanner/sqlalchemy.py | 10 +++++++--- dataherald/scripts/migrate_v002_to_v003.py | 17 +++++++++++++++++ .../sql_generator/dataherald_sqlagent.py | 5 +++-- docs/api.list_table_description.rst | 19 +++++++++++++++++-- 6 files changed, 57 insertions(+), 11 deletions(-) create mode 100644 dataherald/scripts/migrate_v002_to_v003.py diff --git a/dataherald/api/fastapi.py b/dataherald/api/fastapi.py index 23e519e5..187dfa7b 100644 --- a/dataherald/api/fastapi.py +++ b/dataherald/api/fastapi.py @@ -14,7 +14,7 @@ from dataherald.context_store import ContextStore from dataherald.db import DB from dataherald.db_scanner import Scanner -from dataherald.db_scanner.models.types import TableSchemaDetail +from dataherald.db_scanner.models.types import TableDescriptionStatus, TableSchemaDetail from dataherald.db_scanner.repository.base import DBScannerRepository from dataherald.eval import Evaluator from dataherald.repositories.base import NLQueryResponseRepository @@ -237,14 +237,14 @@ def list_table_descriptions( for table_description in table_descriptions: if table_description.table_name not in all_tables: - table_description.status = "DEPRECATED" + table_description.status = TableDescriptionStatus.DEPRECATED.value else: all_tables.remove(table_description.table_name) for table in all_tables: table_descriptions.append( TableSchemaDetail( table_name=table, - status="NOT_SYNCHRONIZED", + status=TableDescriptionStatus.NOT_SYNCHRONIZED.value, db_connection_id=db_connection_id, columns=[], ) diff --git a/dataherald/db_scanner/models/types.py b/dataherald/db_scanner/models/types.py index be711fd8..af46865c 100644 --- a/dataherald/db_scanner/models/types.py +++ b/dataherald/db_scanner/models/types.py @@ -1,4 +1,5 @@ from datetime import datetime +from enum import Enum from typing import Any from pydantic import BaseModel @@ -19,6 +20,14 @@ class ColumnDetail(BaseModel): foreign_key: ForeignKeyDetail | None +class TableDescriptionStatus(Enum): + NOT_SYNCHRONIZED = "NOT_SYNCHRONIZED" + SYNCHRONIZING = "SYNCHRONIZING" + DEPRECATED = "DEPRECATED" + SYNCHRONIZED = "SYNCHRONIZED" + FAILED = "FAILED" + + class TableSchemaDetail(BaseModel): id: Any db_connection_id: str @@ -28,5 +37,5 @@ class TableSchemaDetail(BaseModel): columns: list[ColumnDetail] = [] examples: list = [] last_schema_sync: datetime | None - status: str = "SYNCHRONIZED" + status: str = TableDescriptionStatus.SYNCHRONIZED.value error_message: str | None diff --git a/dataherald/db_scanner/sqlalchemy.py b/dataherald/db_scanner/sqlalchemy.py index 090cb2a5..0d90388b 100644 --- a/dataherald/db_scanner/sqlalchemy.py +++ b/dataherald/db_scanner/sqlalchemy.py @@ -8,7 +8,11 @@ from sqlalchemy.sql import func from dataherald.db_scanner import Scanner -from dataherald.db_scanner.models.types import ColumnDetail, TableSchemaDetail +from dataherald.db_scanner.models.types import ( + ColumnDetail, + TableDescriptionStatus, + TableSchemaDetail, +) from dataherald.db_scanner.repository.base import DBScannerRepository from dataherald.sql_database.base import SQLDatabase @@ -178,7 +182,7 @@ def scan( TableSchemaDetail( db_connection_id=db_connection_id, table_name=table, - status="SYNCHRONIZING", + status=TableDescriptionStatus.SYNCHRONIZING.value, ) ) @@ -196,7 +200,7 @@ def scan( TableSchemaDetail( db_connection_id=db_connection_id, table_name=table, - status="FAILED", + status=TableDescriptionStatus.FAILED.value, error_message=f"{e}", ) ) diff --git a/dataherald/scripts/migrate_v002_to_v003.py b/dataherald/scripts/migrate_v002_to_v003.py new file mode 100644 index 00000000..4f1195d9 --- /dev/null +++ b/dataherald/scripts/migrate_v002_to_v003.py @@ -0,0 +1,17 @@ +import dataherald.config +from dataherald.config import System +from dataherald.db import DB + +if __name__ == "__main__": + settings = dataherald.config.Settings() + system = System(settings) + system.start() + storage = system.instance(DB) + # Update table_descriptions status + collection_rows = storage.find_all("table_descriptions") + for collection_row in collection_rows: + collection_row["status"] = "SYNCHRONIZED" + # update object + storage.update_or_create( + "table_descriptions", {"_id": collection_row["_id"]}, collection_row + ) diff --git a/dataherald/sql_generator/dataherald_sqlagent.py b/dataherald/sql_generator/dataherald_sqlagent.py index 8009e31a..f4a3355c 100644 --- a/dataherald/sql_generator/dataherald_sqlagent.py +++ b/dataherald/sql_generator/dataherald_sqlagent.py @@ -30,7 +30,7 @@ from dataherald.context_store import ContextStore from dataherald.db import DB -from dataherald.db_scanner.models.types import TableSchemaDetail +from dataherald.db_scanner.models.types import TableDescriptionStatus, TableSchemaDetail from dataherald.db_scanner.repository.base import DBScannerRepository from dataherald.sql_database.base import SQLDatabase, SQLInjectionError from dataherald.sql_database.models.types import ( @@ -581,7 +581,8 @@ def generate_response( ) repository = DBScannerRepository(storage) db_scan = repository.get_all_tables_by_db( - db_connection_id=database_connection.id + db_connection_id=database_connection.id, + status=TableDescriptionStatus.SYNCHRONIZED.value, ) if not db_scan: raise ValueError("No scanned tables found for database") diff --git a/docs/api.list_table_description.rst b/docs/api.list_table_description.rst index 39551007..cca84a0f 100644 --- a/docs/api.list_table_description.rst +++ b/docs/api.list_table_description.rst @@ -3,7 +3,7 @@ List table descriptions ======================= -Once you have scanned a db connection you can list the table descriptions by requesting this endpoint. +This endpoint returns the database connection tables and includes a status field that indicates whether the tables have been scanned or not. Request this ``GET`` endpoint:: @@ -15,7 +15,7 @@ Request this ``GET`` endpoint:: :header: "Name", "Type", "Description" :widths: 20, 20, 60 - "db_connection_id", "string", "Filter by connection id, ``Optional``" + "db_connection_id", "string", "Filter by connection id, ``Optional``. By configuring this field, it establishes a connection with the database to fetch table names and subsequently merges this data with the pre-existing rows in our MongoDB." "table_name", "string", "Filter by table name, ``Optional``" **Responses** @@ -31,6 +31,8 @@ HTTP 200 code response "table_name": "string", "description": "string", "table_schema": "string", + "status": "NOT_SYNCHRONIZED | SYNCHRONIZING | DEPRECATED | SYNCHRONIZED | FAILED" + "error_message": "string", "columns": [ { "name": "string", @@ -51,6 +53,19 @@ HTTP 200 code response } ] +.. csv-table:: + :header: "Name", "Type", "Description" + :widths: 20, 20, 60 + + "status", "string", "It can be one of the next options: + - `NOT_SYNCHRONIZED` if the table has not been scanned + - `SYNCHRONIZING` while the sync schema process is running + - `DEPRECATED` if there is a row in our `table-descriptions` collection that is no longer in the database, probably because the table/view was deleted or renamed + - `SYNCHRONIZED` when we have scanned the table + - `FAILED` if anything failed during the sync schema process, and the `error_message` field stores the error." + "error_message", "string", "This field is set only if the async schema process fails" + + **Request example** .. code-block:: rst