Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Developers can now more easily override the mapping from SQL column type to JSON schema #2618

Merged
8 changes: 8 additions & 0 deletions docs/classes/singer_sdk.connectors.sql.SQLToJSONSchema.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
singer_sdk.connectors.sql.SQLToJSONSchema
=========================================

.. currentmodule:: singer_sdk.connectors.sql

.. autoclass:: SQLToJSONSchema
:members:
:special-members: __init__, __call__
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@
"https://json-schema.org/understanding-json-schema/reference/%s",
"%s",
),
"column_type": (
"https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.%s",
"%s",
),
}

# -- Options for intersphinx -----------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/guides/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ porting
pagination-classes
custom-clis
config-schema
sql-tap
```
54 changes: 54 additions & 0 deletions docs/guides/sql-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Building SQL taps

## Default type mapping

The Singer SDK automatically handles the most common SQLAlchemy column types, using [`functools.singledispatchmethod`](inv:python:py:class:#functools.singledispatchmethod) to process each type. See the [`SQLToJSONSchema`](connectors.sql.SQLToJSONSchema) reference documentation for details.

## Custom type mapping

If the class above doesn't cover all the types supported by the SQLAlchemy dialect in your tap, you can subclass it and override or extend with a new method for the type you need to support:

```python
import functools

from sqlalchemy import Numeric
from singer_sdk import typing as th
from singer_sdk.connectors import SQLConnector
from singer_sdk.connectors.sql import SQLToJSONSchema

from my_sqlalchemy_dialect import VectorType


class CustomSQLToJSONSchema(SQLToJSONSchema):
@SQLToJSONSchema.to_jsonschema.register
def custom_number_to_jsonschema(self, column_type: Numeric):
"""Override the default mapping for NUMERIC columns.

For example, a scale of 4 translates to a multipleOf 0.0001.
"""
return {"type": ["number"], "multipleOf": 10**-column_type.scale}

@SQLToJSONSchema.to_jsonschema.register(VectorType)
def vector_to_json_schema(self, column_type):
"""Custom vector to JSON schema."""
return th.ArrayType(th.NumberType()).to_dict()
```

````{tip}
You can also use a type annotation to specify the type of the column when registering a new method:

```python
@SQLToJSONSchema.to_jsonschema.register
def vector_to_json_schema(self, column_type: VectorType):
return th.ArrayType(th.NumberType()).to_dict()
```
````

Then, you need to use your custom type mapping in your connector:

```python
class MyConnector(SQLConnector):
@functools.cached_property
def type_mapping(self):
return CustomSQLToJSONSchema()
```
9 changes: 9 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,12 @@ Batch

batch.BaseBatcher
batch.JSONLinesBatcher

Other
-----

.. autosummary::
:toctree: classes
:template: class.rst

connectors.sql.SQLToJSONSchema
110 changes: 107 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import functools
import logging
import sys
import typing as t
Expand Down Expand Up @@ -109,6 +110,83 @@ def prepare_part(self, part: str) -> str: # noqa: PLR6301
return part


class SQLToJSONSchema:
"""SQLAlchemy to JSON Schema type mapping helper.

This class provides a mapping from SQLAlchemy types to JSON Schema types.
"""

@functools.singledispatchmethod
def to_jsonschema(self, column_type: sa.types.TypeEngine) -> dict: # noqa: ARG002, D102, PLR6301
return th.StringType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def datetime_to_jsonschema(self, column_type: sa.types.DateTime) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a generic datetime type.

Args:
column_type (:column_type:`DateTime`): The column type.
"""
return th.DateTimeType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def date_to_jsonschema(self, column_type: sa.types.Date) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a date type.

Args:
column_type (:column_type:`Date`): The column type.
"""
return th.DateType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def time_to_jsonschema(self, column_type: sa.types.Time) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a time type.

Args:
column_type (:column_type:`Time`): The column type.
"""
return th.TimeType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def integer_to_jsonschema(self, column_type: sa.types.Integer) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a an integer type.

Args:
column_type (:column_type:`Integer`): The column type.
"""
return th.IntegerType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def float_to_jsonschema(self, column_type: sa.types.Numeric) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a generic number type.

Args:
column_type (:column_type:`Numeric`): The column type.
"""
return th.NumberType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def string_to_jsonschema(self, column_type: sa.types.String) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a generic string type.

Args:
column_type (:column_type:`String`): The column type.
"""
# TODO: Enable support for maxLength.
# if sa_type.length:
# return StringType(max_length=sa_type.length).type_dict # noqa: ERA001
return th.StringType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a boolean type.

Args:
column_type (:column_type:`Boolean`): The column type.
"""
return th.BooleanType.type_dict # type: ignore[no-any-return]


class SQLConnector: # noqa: PLR0904
"""Base class for SQLAlchemy-based connectors.

Expand Down Expand Up @@ -162,6 +240,17 @@ def logger(self) -> logging.Logger:
"""
return logging.getLogger("sqlconnector")

@functools.cached_property
def type_mapping(self) -> SQLToJSONSchema:
"""Return the type mapper object.

Override this method to provide a custom mapping for your SQL dialect.

Returns:
The type mapper object.
"""
return SQLToJSONSchema()

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
with self._engine.connect().execution_options(stream_results=True) as conn:
Expand Down Expand Up @@ -266,8 +355,8 @@ def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str: # noqa: PLR6301

return t.cast(str, config["sqlalchemy_url"])

@staticmethod
def to_jsonschema_type(
self,
sql_type: (
str # noqa: ANN401
| sa.types.TypeEngine
Expand All @@ -293,10 +382,25 @@ def to_jsonschema_type(
Returns:
The JSON Schema representation of the provided type.
"""
if isinstance(sql_type, (str, sa.types.TypeEngine)):
if isinstance(sql_type, sa.types.TypeEngine):
return self.type_mapping.to_jsonschema(sql_type)

if isinstance(sql_type, str): # pragma: no cover
warnings.warn(
"Passing string types to `to_jsonschema_type` is deprecated. "
"Please pass a SQLAlchemy type object instead.",
DeprecationWarning,
stacklevel=2,
)
return th.to_jsonschema_type(sql_type)

if isinstance(sql_type, type):
if isinstance(sql_type, type): # pragma: no cover
warnings.warn(
"Passing type classes to `to_jsonschema_type` is deprecated. "
"Please pass a SQLAlchemy type object instead.",
DeprecationWarning,
stacklevel=2,
)
if issubclass(sql_type, sa.types.TypeEngine):
return th.to_jsonschema_type(sql_type)

Expand Down
17 changes: 13 additions & 4 deletions singer_sdk/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from __future__ import annotations

import json
import sys
import typing as t

import sqlalchemy as sa
Expand All @@ -65,9 +66,13 @@
get_datelike_property_type,
)

if t.TYPE_CHECKING:
import sys
if sys.version_info < (3, 13):
from typing_extensions import deprecated
else:
from typing import deprecated # noqa: ICN003 # pragma: no cover


if t.TYPE_CHECKING:
from jsonschema.protocols import Validator

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -1086,6 +1091,10 @@ def __iter__(self) -> t.Iterator[Property]:
return self.wrapped.values().__iter__()


@deprecated(
"Use `SQLToJSONSchema` instead.",
category=DeprecationWarning,
)
def to_jsonschema_type(
from_type: str | sa.types.TypeEngine | type[sa.types.TypeEngine],
) -> dict:
Expand Down Expand Up @@ -1119,9 +1128,9 @@ def to_jsonschema_type(
"bool": BooleanType.type_dict,
"variant": StringType.type_dict,
}
if isinstance(from_type, str):
if isinstance(from_type, str): # pragma: no cover
type_name = from_type
elif isinstance(from_type, sa.types.TypeEngine):
elif isinstance(from_type, sa.types.TypeEngine): # pragma: no cover
type_name = type(from_type).__name__
elif issubclass(from_type, sa.types.TypeEngine):
type_name = from_type.__name__
Expand Down
85 changes: 84 additions & 1 deletion tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.connectors import SQLConnector
from singer_sdk.connectors.sql import FullyQualifiedName
from singer_sdk.connectors.sql import FullyQualifiedName, SQLToJSONSchema
from singer_sdk.exceptions import ConfigValidationError

if t.TYPE_CHECKING:
Expand All @@ -22,6 +22,10 @@ def stringify(in_dict):
return {k: str(v) for k, v in in_dict.items()}


class MyType(sa.types.TypeDecorator):
impl = sa.types.LargeBinary


class TestConnectorSQL: # noqa: PLR0904
"""Test the SQLConnector class."""

Expand Down Expand Up @@ -392,3 +396,82 @@ def prepare_part(self, part: str) -> str:
def test_fully_qualified_name_empty_error():
with pytest.raises(ValueError, match="Could not generate fully qualified name"):
FullyQualifiedName()


@pytest.mark.parametrize(
"sql_type, expected_jsonschema_type",
[
pytest.param(sa.types.VARCHAR(), {"type": ["string"]}, id="varchar"),
pytest.param(
sa.types.VARCHAR(length=127),
{"type": ["string"], "maxLength": 127},
marks=pytest.mark.xfail,
id="varchar-length",
),
pytest.param(sa.types.TEXT(), {"type": ["string"]}, id="text"),
pytest.param(sa.types.INTEGER(), {"type": ["integer"]}, id="integer"),
pytest.param(sa.types.BOOLEAN(), {"type": ["boolean"]}, id="boolean"),
pytest.param(sa.types.DECIMAL(), {"type": ["number"]}, id="decimal"),
pytest.param(sa.types.FLOAT(), {"type": ["number"]}, id="float"),
pytest.param(sa.types.REAL(), {"type": ["number"]}, id="real"),
pytest.param(sa.types.NUMERIC(), {"type": ["number"]}, id="numeric"),
pytest.param(
sa.types.DATE(),
{"type": ["string"], "format": "date"},
id="date",
),
pytest.param(
sa.types.DATETIME(),
{"type": ["string"], "format": "date-time"},
id="datetime",
),
pytest.param(
sa.types.TIMESTAMP(),
{"type": ["string"], "format": "date-time"},
id="timestamp",
),
pytest.param(
sa.types.TIME(),
{"type": ["string"], "format": "time"},
id="time",
),
pytest.param(
sa.types.BLOB(),
{"type": ["string"]},
id="unknown",
),
],
)
def test_sql_to_json_schema_map(
sql_type: sa.types.TypeEngine,
expected_jsonschema_type: dict,
):
m = SQLToJSONSchema()
assert m.to_jsonschema(sql_type) == expected_jsonschema_type


def test_custom_type():
class MyMap(SQLToJSONSchema):
@SQLToJSONSchema.to_jsonschema.register
def custom_number_to_jsonschema(self, column_type: sa.types.NUMERIC) -> dict:
"""Custom number to JSON schema.

For example, a scale of 4 translates to a multipleOf 0.0001.
"""
return {"type": ["number"], "multipleOf": 10**-column_type.scale}

@SQLToJSONSchema.to_jsonschema.register(MyType)
def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002
return {"type": ["string"], "contentEncoding": "base64"}

m = MyMap()

assert m.to_jsonschema(MyType()) == {
"type": ["string"],
"contentEncoding": "base64",
}
assert m.to_jsonschema(sa.types.NUMERIC(scale=2)) == {
"type": ["number"],
"multipleOf": 0.01,
}
assert m.to_jsonschema(sa.types.BOOLEAN()) == {"type": ["boolean"]}
Loading
Loading