Skip to content

Commit

Permalink
DH-4703 Set table-desription status (#183)
Browse files Browse the repository at this point in the history
* DH-4703 Set table-desription status

* DH-4703 Added script to update table_descriptions rows
  • Loading branch information
jcjc712 authored Sep 25, 2023
1 parent c31ddfb commit 9e2d119
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 18 deletions.
31 changes: 29 additions & 2 deletions dataherald/api/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from bson import json_util
from fastapi import BackgroundTasks, HTTPException
from overrides import override
from sqlalchemy import MetaData, inspect

from dataherald.api import API
from dataherald.api.types import Query
from dataherald.config import System
from dataherald.context_store import ContextStore
from dataherald.db import DB
from dataherald.db_scanner import Scanner
from dataherald.db_scanner.models.types import TableSchemaDetail
from dataherald.db_scanner.models.types import TableDescriptionStatus, TableSchemaDetail
from dataherald.db_scanner.repository.base import DBScannerRepository
from dataherald.eval import Evaluator
from dataherald.repositories.base import NLQueryResponseRepository
Expand Down Expand Up @@ -221,10 +222,36 @@ def list_table_descriptions(
self, db_connection_id: str | None = None, table_name: str | None = None
) -> list[TableSchemaDetail]:
scanner_repository = DBScannerRepository(self.storage)
return scanner_repository.find_by(
table_descriptions = scanner_repository.find_by(
{"db_connection_id": db_connection_id, "table_name": table_name}
)

if db_connection_id:
db_connection_repository = DatabaseConnectionRepository(self.storage)
db_connection = db_connection_repository.find_by_id(db_connection_id)
database = SQLDatabase.get_sql_engine(db_connection)
inspector = inspect(database.engine)
meta = MetaData(bind=database.engine)
MetaData.reflect(meta, views=True)
all_tables = inspector.get_table_names() + inspector.get_view_names()

for table_description in table_descriptions:
if table_description.table_name not in all_tables:
table_description.status = TableDescriptionStatus.DEPRECATED.value
else:
all_tables.remove(table_description.table_name)
for table in all_tables:
table_descriptions.append(
TableSchemaDetail(
table_name=table,
status=TableDescriptionStatus.NOT_SYNCHRONIZED.value,
db_connection_id=db_connection_id,
columns=[],
)
)

return table_descriptions

@override
def add_golden_records(
self, golden_records: List[GoldenRecordRequest]
Expand Down
14 changes: 12 additions & 2 deletions dataherald/db_scanner/models/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum
from typing import Any

from pydantic import BaseModel
Expand All @@ -19,13 +20,22 @@ class ColumnDetail(BaseModel):
foreign_key: ForeignKeyDetail | None


class TableDescriptionStatus(Enum):
NOT_SYNCHRONIZED = "NOT_SYNCHRONIZED"
SYNCHRONIZING = "SYNCHRONIZING"
DEPRECATED = "DEPRECATED"
SYNCHRONIZED = "SYNCHRONIZED"
FAILED = "FAILED"


class TableSchemaDetail(BaseModel):
id: Any
db_connection_id: str
table_name: str
description: str | None
table_schema: str | None
columns: list[ColumnDetail]
columns: list[ColumnDetail] = []
examples: list = []
last_schema_sync: datetime | None
status: str = "synchrinozed"
status: str = TableDescriptionStatus.SYNCHRONIZED.value
error_message: str | None
43 changes: 33 additions & 10 deletions dataherald/db_scanner/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from sqlalchemy.sql import func

from dataherald.db_scanner import Scanner
from dataherald.db_scanner.models.types import ColumnDetail, TableSchemaDetail
from dataherald.db_scanner.models.types import (
ColumnDetail,
TableDescriptionStatus,
TableSchemaDetail,
)
from dataherald.db_scanner.repository.base import DBScannerRepository
from dataherald.sql_database.base import SQLDatabase

Expand Down Expand Up @@ -146,7 +150,7 @@ def scan_single_table(
meta=meta, db_engine=db_engine, table=table, rows_number=3
),
last_schema_sync=datetime.now(),
status="syncronized",
status="SYNCHRONIZED",
)

repository.save_table_info(object)
Expand All @@ -171,13 +175,32 @@ def scan(
]
if len(tables) == 0:
raise ValueError("No table found")
result = []

# persist tables to be scanned
for table in tables:
obj = self.scan_single_table(
meta=meta,
table=table,
db_engine=db_engine,
db_connection_id=db_connection_id,
repository=repository,
repository.save_table_info(
TableSchemaDetail(
db_connection_id=db_connection_id,
table_name=table,
status=TableDescriptionStatus.SYNCHRONIZING.value,
)
)
result.append(obj)

for table in tables:
try:
self.scan_single_table(
meta=meta,
table=table,
db_engine=db_engine,
db_connection_id=db_connection_id,
repository=repository,
)
except Exception as e:
repository.save_table_info(
TableSchemaDetail(
db_connection_id=db_connection_id,
table_name=table,
status=TableDescriptionStatus.FAILED.value,
error_message=f"{e}",
)
)
17 changes: 17 additions & 0 deletions dataherald/scripts/migrate_v002_to_v003.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dataherald.config
from dataherald.config import System
from dataherald.db import DB

if __name__ == "__main__":
settings = dataherald.config.Settings()
system = System(settings)
system.start()
storage = system.instance(DB)
# Update table_descriptions status
collection_rows = storage.find_all("table_descriptions")
for collection_row in collection_rows:
collection_row["status"] = "SYNCHRONIZED"
# update object
storage.update_or_create(
"table_descriptions", {"_id": collection_row["_id"]}, collection_row
)
5 changes: 3 additions & 2 deletions dataherald/sql_generator/dataherald_sqlagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from dataherald.context_store import ContextStore
from dataherald.db import DB
from dataherald.db_scanner.models.types import TableSchemaDetail
from dataherald.db_scanner.models.types import TableDescriptionStatus, TableSchemaDetail
from dataherald.db_scanner.repository.base import DBScannerRepository
from dataherald.sql_database.base import SQLDatabase, SQLInjectionError
from dataherald.sql_database.models.types import (
Expand Down Expand Up @@ -581,7 +581,8 @@ def generate_response(
)
repository = DBScannerRepository(storage)
db_scan = repository.get_all_tables_by_db(
db_connection_id=database_connection.id
db_connection_id=database_connection.id,
status=TableDescriptionStatus.SYNCHRONIZED.value,
)
if not db_scan:
raise ValueError("No scanned tables found for database")
Expand Down
19 changes: 17 additions & 2 deletions docs/api.list_table_description.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
List table descriptions
=======================

Once you have scanned a db connection you can list the table descriptions by requesting this endpoint.
This endpoint returns the database connection tables and includes a status field that indicates whether the tables have been scanned or not.

Request this ``GET`` endpoint::

Expand All @@ -15,7 +15,7 @@ Request this ``GET`` endpoint::
:header: "Name", "Type", "Description"
:widths: 20, 20, 60

"db_connection_id", "string", "Filter by connection id, ``Optional``"
"db_connection_id", "string", "Filter by connection id, ``Optional``. By configuring this field, it establishes a connection with the database to fetch table names and subsequently merges this data with the pre-existing rows in our MongoDB."
"table_name", "string", "Filter by table name, ``Optional``"

**Responses**
Expand All @@ -31,6 +31,8 @@ HTTP 200 code response
"table_name": "string",
"description": "string",
"table_schema": "string",
"status": "NOT_SYNCHRONIZED | SYNCHRONIZING | DEPRECATED | SYNCHRONIZED | FAILED"
"error_message": "string",
"columns": [
{
"name": "string",
Expand All @@ -51,6 +53,19 @@ HTTP 200 code response
}
]
.. csv-table::
:header: "Name", "Type", "Description"
:widths: 20, 20, 60

"status", "string", "It can be one of the next options:
- `NOT_SYNCHRONIZED` if the table has not been scanned
- `SYNCHRONIZING` while the sync schema process is running
- `DEPRECATED` if there is a row in our `table-descriptions` collection that is no longer in the database, probably because the table/view was deleted or renamed
- `SYNCHRONIZED` when we have scanned the table
- `FAILED` if anything failed during the sync schema process, and the `error_message` field stores the error."
"error_message", "string", "This field is set only if the async schema process fails"


**Request example**

.. code-block:: rst
Expand Down

0 comments on commit 9e2d119

Please sign in to comment.