Skip to content

Commit

Permalink
Merge branch 'main' into feature/disable-filter-pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Nov 14, 2024
2 parents 2ec1a9c + 6afbe24 commit 15620e0
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
run: |
if [ "${{ github.event_name }}" = "push" ]; then
echo "type=sha" > tags.txt
echo "type=raw,value=nightly" > tags.txt
echo "type=schedule" >> tags.txt
fi
if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then
if [ -n "${{ github.event.inputs.docker_image_tag_name }}" ]; then
Expand Down
2 changes: 2 additions & 0 deletions ibis-server/app/model/metadata/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.model.metadata.mssql import MSSQLMetadata
from app.model.metadata.mysql import MySQLMetadata
from app.model.metadata.postgres import PostgresMetadata
from app.model.metadata.snowflake import SnowflakeMetadata
from app.model.metadata.trino import TrinoMetadata

mapping = {
Expand All @@ -16,6 +17,7 @@
DataSource.mysql: MySQLMetadata,
DataSource.postgres: PostgresMetadata,
DataSource.trino: TrinoMetadata,
DataSource.snowflake: SnowflakeMetadata,
}


Expand Down
168 changes: 168 additions & 0 deletions ibis-server/app/model/metadata/snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from contextlib import closing

import ibis

from app.model import SnowflakeConnectionInfo
from app.model.data_source import DataSource
from app.model.metadata.dto import (
Column,
Constraint,
ConstraintType,
Table,
TableProperties,
WrenEngineColumnType,
)
from app.model.metadata.metadata import Metadata


class SnowflakeMetadata(Metadata):
def __init__(self, connection_info: SnowflakeConnectionInfo):
super().__init__(connection_info)
self.connection = DataSource.snowflake.get_connection(connection_info)

def get_table_list(self) -> list[Table]:
schema = self._get_schema_name()
sql = f"""
SELECT
c.TABLE_CATALOG AS TABLE_CATALOG,
c.TABLE_SCHEMA AS TABLE_SCHEMA,
c.TABLE_NAME AS TABLE_NAME,
c.COLUMN_NAME AS COLUMN_NAME,
c.DATA_TYPE AS DATA_TYPE,
c.IS_NULLABLE AS IS_NULLABLE,
c.COMMENT AS COLUMN_COMMENT,
t.COMMENT AS TABLE_COMMENT
FROM
INFORMATION_SCHEMA.COLUMNS c
JOIN
INFORMATION_SCHEMA.TABLES t
ON c.TABLE_SCHEMA = t.TABLE_SCHEMA
AND c.TABLE_NAME = t.TABLE_NAME
WHERE
c.TABLE_SCHEMA = '{schema}';
"""
response = self.connection.sql(sql).to_pandas().to_dict(orient="records")

unique_tables = {}
for row in response:
# generate unique table name
schema_table = self._format_compact_table_name(
row["TABLE_SCHEMA"], row["TABLE_NAME"]
)
# init table if not exists
if schema_table not in unique_tables:
unique_tables[schema_table] = Table(
name=schema_table,
description=row["TABLE_COMMENT"],
columns=[],
properties=TableProperties(
schema=row["TABLE_SCHEMA"],
catalog=row["TABLE_CATALOG"],
table=row["TABLE_NAME"],
),
primaryKey="",
)

# table exists, and add column to the table
unique_tables[schema_table].columns.append(
Column(
name=row["COLUMN_NAME"],
type=self._transform_column_type(row["DATA_TYPE"]),
notNull=row["IS_NULLABLE"].lower() == "no",
description=row["COLUMN_COMMENT"],
properties=None,
)
)
return list(unique_tables.values())

def get_constraints(self) -> list[Constraint]:
database = self._get_database_name()
schema = self._get_schema_name()
sql = f"""
SHOW IMPORTED KEYS IN SCHEMA {database}.{schema};
"""
with closing(self.connection.raw_sql(sql)) as cur:
fields = [field[0] for field in cur.description]
result = [dict(zip(fields, row)) for row in cur.fetchall()]
res = (
ibis.memtable(result).to_pandas().to_dict(orient="records")
if len(result) > 0
else []
)
constraints = []
for row in res:
constraints.append(
Constraint(
constraintName=self._format_constraint_name(
row["pk_table_name"],
row["pk_column_name"],
row["fk_table_name"],
row["fk_column_name"],
),
constraintTable=self._format_compact_table_name(
row["pk_schema_name"], row["pk_table_name"]
),
constraintColumn=row["pk_column_name"],
constraintedTable=self._format_compact_table_name(
row["fk_schema_name"], row["fk_table_name"]
),
constraintedColumn=row["fk_column_name"],
constraintType=ConstraintType.FOREIGN_KEY,
)
)
return constraints

def get_version(self) -> str:
return self.connection.sql("SELECT CURRENT_VERSION()").to_pandas().iloc[0, 0]

def _get_database_name(self):
return self.connection_info.database.get_secret_value()

def _get_schema_name(self):
return self.connection_info.sf_schema.get_secret_value()

def _format_compact_table_name(self, schema: str, table: str):
return f"{schema}.{table}"

def _format_constraint_name(
self, table_name, column_name, referenced_table_name, referenced_column_name
):
return f"{table_name}_{column_name}_{referenced_table_name}_{referenced_column_name}"

def _transform_column_type(self, data_type):
# all possible types listed here: https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
switcher = {
# Numeric Types
"number": WrenEngineColumnType.NUMERIC,
"decimal": WrenEngineColumnType.NUMERIC,
"numeric": WrenEngineColumnType.NUMERIC,
"int": WrenEngineColumnType.INTEGER,
"integer": WrenEngineColumnType.INTEGER,
"bigint": WrenEngineColumnType.BIGINT,
"smallint": WrenEngineColumnType.SMALLINT,
"tinyint": WrenEngineColumnType.TINYINT,
"byteint": WrenEngineColumnType.TINYINT,
# Float
"float4": WrenEngineColumnType.FLOAT4,
"float": WrenEngineColumnType.FLOAT8,
"float8": WrenEngineColumnType.FLOAT8,
"double": WrenEngineColumnType.DOUBLE,
"double precision": WrenEngineColumnType.DOUBLE,
"real": WrenEngineColumnType.REAL,
# String Types
"varchar": WrenEngineColumnType.VARCHAR,
"char": WrenEngineColumnType.CHAR,
"character": WrenEngineColumnType.CHAR,
"string": WrenEngineColumnType.STRING,
"text": WrenEngineColumnType.TEXT,
# Boolean
"boolean": WrenEngineColumnType.BOOLEAN,
# Date and Time Types
"date": WrenEngineColumnType.DATE,
"datetime": WrenEngineColumnType.TIMESTAMP,
"timestamp": WrenEngineColumnType.TIMESTAMP,
"timestamp_ntz": WrenEngineColumnType.TIMESTAMP,
"timestamp_tz": WrenEngineColumnType.TIMESTAMPTZ,
}

return switcher.get(data_type.lower(), WrenEngineColumnType.UNKNOWN)
46 changes: 23 additions & 23 deletions ibis-server/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ibis-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ibis-framework = { version = "9.5.0", extras = [
"snowflake",
"trino",
] }
google-auth = "2.35.0"
google-auth = "2.36.0"
httpx = "0.27.2"
python-dotenv = "1.0.1"
orjson = "3.10.11"
Expand All @@ -40,7 +40,7 @@ testcontainers = { version = "4.8.2", extras = [
] }
sqlalchemy = "2.0.36"
pre-commit = "4.0.1"
ruff = "0.7.2"
ruff = "0.7.3"
trino = ">=0.321,<1"
psycopg2 = ">=2.8.4,<3"
clickhouse-connect = "0.8.6"
Expand Down
6 changes: 3 additions & 3 deletions ibis-server/resources/function_list/bigquery.csv
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ scalar,timestamp_add,TIMESTAMP,"Adds a specified interval to a timestamp."
scalar,timestamp_sub,TIMESTAMP,"Subtracts a specified interval from a timestamp."
scalar,timestamp_diff,INT64,"Returns the difference between two timestamps."
scalar,timestamp_trunc,TIMESTAMP,"Truncates a timestamp to a specified granularity."
scalar,timestamp_micros,TIMESTAMP,"Converts the number of microseconds since 1970-01-01 00:00:00 UTC to a TIMESTAMP.",
scalar,timestamp_millis,TIMESTAMP,"Converts the number of milliseconds since 1970-01-01 00:00:00 UTC to a TIMESTAMP.",
scalar,timestamp_seconds,TIMESTAMP,"Converts the number of seconds since 1970-01-01 00:00:00 UTC to a TIMESTAMP.",
scalar,timestamp_micros,TIMESTAMP,"Converts the number of microseconds since 1970-01-01 00:00:00 UTC to a TIMESTAMP."
scalar,timestamp_millis,TIMESTAMP,"Converts the number of milliseconds since 1970-01-01 00:00:00 UTC to a TIMESTAMP."
scalar,timestamp_seconds,TIMESTAMP,"Converts the number of seconds since 1970-01-01 00:00:00 UTC to a TIMESTAMP."
scalar,format_date,STRING,"Formats a date according to the specified format string."
scalar,format_timestamp,STRING,"Formats a timestamp according to the specified format string."
scalar,parse_date,DATE,"Parses a date from a string."
Expand Down
48 changes: 41 additions & 7 deletions ibis-server/tests/routers/v2/connector/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_query(manifest_str):
36901,
"O",
"173665.47",
"1996-01-02 00:00:00.000000",
"1996-01-02",
"1_36901",
"2024-01-01 23:59:59.000000",
"2024-01-01 23:59:59.000000 UTC",
Expand Down Expand Up @@ -261,14 +261,48 @@ def test_validate_rule_column_is_valid_without_one_parameter(manifest_str):
assert response.status_code == 422
assert response.text == "Missing required parameter: `modelName`"

@pytest.mark.skip(reason="Not implemented")
def test_metadata_list_tables():
pass
response = client.post(
url=f"{base_url}/metadata/tables",
json={"connectionInfo": connection_info},
)
assert response.status_code == 200
tables = response.json()
assert len(tables) == 8
table = next(filter(lambda t: t["name"] == "TPCH_SF1.ORDERS", tables))
assert table["name"] == "TPCH_SF1.ORDERS"
assert table["primaryKey"] is not None
assert table["description"] == "Orders data as defined by TPC-H"
assert table["properties"] == {
"catalog": "SNOWFLAKE_SAMPLE_DATA",
"schema": "TPCH_SF1",
"table": "ORDERS",
}
assert len(table["columns"]) == 9
column = next(filter(lambda c: c["name"] == "O_COMMENT", table["columns"]))
assert column == {
"name": "O_COMMENT",
"nestedColumns": None,
"type": "TEXT",
"notNull": True,
"description": None,
"properties": None,
}

@pytest.mark.skip(reason="Not implemented")
def test_metadata_list_constraints():
pass
response = client.post(
url=f"{base_url}/metadata/constraints",
json={"connectionInfo": connection_info},
)
assert response.status_code == 200

result = response.json()
assert len(result) == 0

@pytest.mark.skip(reason="Not implemented")
def test_metadata_get_version():
pass
response = client.post(
url=f"{base_url}/metadata/version",
json={"connectionInfo": connection_info},
)
assert response.status_code == 200
assert response.text is not None
12 changes: 12 additions & 0 deletions ibis-server/tests/routers/v3/connector/bigquery/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

import pytest

from app.config import get_config
from tests.conftest import file_path

pytestmark = pytest.mark.bigquery

base_url = "/v3/connector/bigquery"
function_list_path = file_path("../resources/function_list")


def pytest_collection_modifyitems(items):
Expand All @@ -22,3 +26,11 @@ def connection_info():
"dataset_id": "tpch_tiny",
"credentials": os.getenv("TEST_BIG_QUERY_CREDENTIALS_BASE64_JSON"),
}


@pytest.fixture(autouse=True)
def set_remote_function_list_path():
config = get_config()
config.set_remote_function_list_path(function_list_path)
yield
config.set_remote_function_list_path(None)
Loading

0 comments on commit 15620e0

Please sign in to comment.