Skip to content

Commit

Permalink
Add data source ClickHouse (#646)
Browse files Browse the repository at this point in the history
* Add data source ClickHouse

* Make mysql support connection url

* Fix marker
  • Loading branch information
grieve54706 authored Jul 3, 2024
1 parent fc262ce commit a137bb5
Show file tree
Hide file tree
Showing 9 changed files with 865 additions and 62 deletions.
12 changes: 12 additions & 0 deletions ibis-server/app/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class QueryBigQueryDTO(QueryDTO):
connection_info: BigQueryConnectionInfo = connection_info_field


class QueryClickHouseDTO(QueryDTO):
connection_info: ConnectionUrl | ClickHouseConnectionInfo = connection_info_field


class QueryMSSqlDTO(QueryDTO):
connection_info: MSSqlConnectionInfo = connection_info_field

Expand All @@ -45,6 +49,14 @@ class BigQueryConnectionInfo(BaseModel):
credentials: str = Field(description="Base64 encode `credentials.json`")


class ClickHouseConnectionInfo(BaseModel):
host: str
port: int
database: str
user: str
password: str


class MSSqlConnectionInfo(BaseModel):
host: str
port: int
Expand Down
30 changes: 0 additions & 30 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from json import loads

import pandas as pd

from app.mdl.rewriter import rewrite
Expand Down Expand Up @@ -32,33 +30,5 @@ def dry_run(self, sql: str) -> None:
raise QueryDryRunError(f"Exception: {type(e)}, message: {str(e)}")


def to_json(df, column_dtypes):
if column_dtypes:
_to_specific_types(df, column_dtypes)
json_obj = loads(df.to_json(orient="split"))
del json_obj["index"]
json_obj["dtypes"] = df.dtypes.apply(lambda x: x.name).to_dict()
return json_obj


def _to_specific_types(df: pd.DataFrame, column_dtypes: dict[str, str]):
for column, dtype in column_dtypes.items():
if dtype == "datetime64":
df[column] = _to_datetime_and_format(df[column])
else:
df[column] = df[column].astype(dtype)


def _to_datetime_and_format(series: pd.Series) -> pd.Series:
series = pd.to_datetime(series, errors="coerce")
return series.apply(
lambda d: d.strftime(
"%Y-%m-%d %H:%M:%S.%f" + (" %Z" if series.dt.tz is not None else "")
)
if not pd.isnull(d)
else d
)


class QueryDryRunError(Exception):
pass
29 changes: 26 additions & 3 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import base64
from enum import Enum, StrEnum, auto
from json import loads
from urllib.parse import urlparse

import ibis
from google.oauth2 import service_account
from ibis import BaseBackend

from app.model import (
BigQueryConnectionInfo,
ClickHouseConnectionInfo,
ConnectionInfo,
ConnectionUrl,
MSSqlConnectionInfo,
MySqlConnectionInfo,
PostgresConnectionInfo,
QueryBigQueryDTO,
QueryClickHouseDTO,
QueryDTO,
QueryMSSqlDTO,
QueryMySqlDTO,
Expand All @@ -27,6 +30,7 @@

class DataSource(StrEnum):
bigquery = auto()
clickhouse = auto()
mssql = auto()
mysql = auto()
postgres = auto()
Expand All @@ -47,6 +51,7 @@ def get_dto_type(self):

class DataSourceExtension(Enum):
bigquery = QueryBigQueryDTO
clickhouse = QueryClickHouseDTO
mssql = QueryMSSqlDTO
mysql = QueryMySqlDTO
postgres = QueryPostgresDTO
Expand All @@ -73,6 +78,20 @@ def get_bigquery_connection(info: BigQueryConnectionInfo) -> BaseBackend:
credentials=credentials,
)

@staticmethod
def get_clickhouse_connection(
info: ConnectionUrl | ClickHouseConnectionInfo,
) -> BaseBackend:
connection_url = (
getattr(info, "connection_url", None)
or f"clickhouse://{info.user}:{info.password}@{info.host}:{info.port}/{info.database}"
)
return ibis.connect(
connection_url,
# ibis miss port of connection url, so we need to pass it explicitly
port=urlparse(connection_url).port,
)

@staticmethod
def get_mssql_connection(info: MSSqlConnectionInfo) -> BaseBackend:
# mssql in ibis does not support connection url
Expand All @@ -89,10 +108,14 @@ def get_mssql_connection(info: MSSqlConnectionInfo) -> BaseBackend:
def get_mysql_connection(
info: ConnectionUrl | MySqlConnectionInfo,
) -> BaseBackend:
return ibis.connect(
connection_url = (
getattr(info, "connection_url", None)
or f"mysql://{info.user}:{info.password}@{info.host}:{info.port}/{info.database}",
port=info.port, # ibis miss port of connection url, so we need to pass it explicitly
or f"mysql://{info.user}:{info.password}@{info.host}:{info.port}/{info.database}"
)
return ibis.connect(
connection_url,
# ibis miss port of connection url, so we need to pass it explicitly
port=urlparse(connection_url).port,
)

@staticmethod
Expand Down
6 changes: 2 additions & 4 deletions ibis-server/app/routers/v2/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
QueryDTO,
ValidateDTO,
)
from app.model.connector import (
Connector,
to_json,
)
from app.model.connector import Connector
from app.model.data_source import DataSource
from app.model.metadata.dto import Constraint, MetadataDTO, Table
from app.model.metadata.factory import MetadataFactory
from app.model.validator import Validator
from app.util import to_json

router = APIRouter(prefix="/ibis")

Expand Down
60 changes: 60 additions & 0 deletions ibis-server/app/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import calendar
import datetime
import decimal

import orjson
import pandas as pd


def to_json(df: pd.DataFrame, column_dtypes: dict[str, str] = None) -> dict:
if column_dtypes:
_to_specific_types(df, column_dtypes)
return _to_json_obj(df)


def _to_specific_types(df: pd.DataFrame, column_dtypes: dict[str, str]):
for column, dtype in column_dtypes.items():
if dtype == "datetime64":
df[column] = _to_datetime_and_format(df[column])
else:
df[column] = df[column].astype(dtype)


def _to_datetime_and_format(series: pd.Series) -> pd.Series:
series = pd.to_datetime(series, errors="coerce")
return series.apply(
lambda d: d.strftime(
"%Y-%m-%d %H:%M:%S.%f" + (" %Z" if series.dt.tz is not None else "")
)
if not pd.isnull(d)
else d
)


def _to_json_obj(df: pd.DataFrame) -> dict:
data = df.to_dict(orient="split", index=False)

def default(d):
match d:
case decimal.Decimal():
return float(d)
case pd.Timestamp():
return d.value // 10**6
case datetime.datetime():
return int(d.timestamp())
case datetime.date():
return calendar.timegm(d.timetuple()) * 1000
case _:
raise d

json_obj = orjson.loads(
orjson.dumps(
data,
option=orjson.OPT_SERIALIZE_NUMPY
| orjson.OPT_PASSTHROUGH_DATETIME
| orjson.OPT_SERIALIZE_UUID,
default=default,
)
)
json_obj["dtypes"] = df.dtypes.astype(str).to_dict()
return json_obj
Loading

0 comments on commit a137bb5

Please sign in to comment.