diff --git a/dataherald/api/fastapi.py b/dataherald/api/fastapi.py index 21bfb84e..9c28ea9a 100644 --- a/dataherald/api/fastapi.py +++ b/dataherald/api/fastapi.py @@ -6,6 +6,7 @@ from bson import json_util from fastapi import BackgroundTasks, HTTPException from overrides import override +from sqlalchemy import MetaData, inspect from dataherald.api import API from dataherald.api.types import Query @@ -221,10 +222,36 @@ def list_table_descriptions( self, db_connection_id: str | None = None, table_name: str | None = None ) -> list[TableSchemaDetail]: scanner_repository = DBScannerRepository(self.storage) - return scanner_repository.find_by( + table_descriptions = scanner_repository.find_by( {"db_connection_id": db_connection_id, "table_name": table_name} ) + if db_connection_id: + db_connection_repository = DatabaseConnectionRepository(self.storage) + db_connection = db_connection_repository.find_by_id(db_connection_id) + database = SQLDatabase.get_sql_engine(db_connection) + inspector = inspect(database.engine) + meta = MetaData(bind=database.engine) + MetaData.reflect(meta, views=True) + all_tables = inspector.get_table_names() + inspector.get_view_names() + + for table_description in table_descriptions: + if table_description.table_name not in all_tables: + table_description.status = "deprecated" + else: + all_tables.remove(table_description.table_name) + for table in all_tables: + table_descriptions.append( + TableSchemaDetail( + table_name=table, + status="NOT_SYNCHRONIZED", + db_connection_id=db_connection_id, + columns=[], + ) + ) + + return table_descriptions + @override def add_golden_records( self, golden_records: List[GoldenRecordRequest] diff --git a/dataherald/db_scanner/models/types.py b/dataherald/db_scanner/models/types.py index 21d53259..277ad66c 100644 --- a/dataherald/db_scanner/models/types.py +++ b/dataherald/db_scanner/models/types.py @@ -25,7 +25,8 @@ class TableSchemaDetail(BaseModel): table_name: str description: str | None table_schema: str | None - columns: list[ColumnDetail] + columns: list[ColumnDetail] = [] examples: list = [] last_schema_sync: datetime | None - status: str = "synchrinozed" + status: str = "SYNCHRONIZED" + error_mesage: str | None diff --git a/dataherald/db_scanner/sqlalchemy.py b/dataherald/db_scanner/sqlalchemy.py index 093b6abd..090cb2a5 100644 --- a/dataherald/db_scanner/sqlalchemy.py +++ b/dataherald/db_scanner/sqlalchemy.py @@ -146,7 +146,7 @@ def scan_single_table( meta=meta, db_engine=db_engine, table=table, rows_number=3 ), last_schema_sync=datetime.now(), - status="syncronized", + status="SYNCHRONIZED", ) repository.save_table_info(object) @@ -171,13 +171,32 @@ def scan( ] if len(tables) == 0: raise ValueError("No table found") - result = [] + + # persist tables to be scanned for table in tables: - obj = self.scan_single_table( - meta=meta, - table=table, - db_engine=db_engine, - db_connection_id=db_connection_id, - repository=repository, + repository.save_table_info( + TableSchemaDetail( + db_connection_id=db_connection_id, + table_name=table, + status="SYNCHRONIZING", + ) ) - result.append(obj) + + for table in tables: + try: + self.scan_single_table( + meta=meta, + table=table, + db_engine=db_engine, + db_connection_id=db_connection_id, + repository=repository, + ) + except Exception as e: + repository.save_table_info( + TableSchemaDetail( + db_connection_id=db_connection_id, + table_name=table, + status="FAILED", + error_message=f"{e}", + ) + )