Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Test with singer-sdk @ main #236

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ sqlalchemy = "<3"
sshtunnel = "0.4.0"

[tool.poetry.dependencies.singer-sdk]
version = "~=0.40.0a1"
extras = ["faker"]
git = "https://github.com/meltano/sdk.git"

[tool.poetry.group.dev.dependencies]
faker = ">=18.5.1"
Expand All @@ -56,7 +55,7 @@ types-jsonschema = ">=4.19.0.3"
types-psycopg2 = ">=2.9.21.20240118"

[tool.poetry.dev-dependencies.singer-sdk]
version = "*"
git = "https://github.com/meltano/sdk.git"
extras = ["testing"]

[tool.mypy]
Expand Down
174 changes: 47 additions & 127 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import datetime
import functools
import json
import select
import typing as t
Expand All @@ -16,19 +17,59 @@
import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy as sa
import sqlalchemy.types
from psycopg2 import extras
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk.connectors.sql import SQLToJSONSchema
from singer_sdk.helpers._state import increment_state
from singer_sdk.helpers._typing import TypeConformanceLevel
from singer_sdk.streams.core import REPLICATION_INCREMENTAL
from sqlalchemy.dialects import postgresql

if TYPE_CHECKING:
from singer_sdk.helpers.types import Context
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.types import TypeEngine


class PostgresSQLToJSONSchema(SQLToJSONSchema):
"""Custom SQL to JSON Schema conversion for Postgres."""

def __init__(self, dates_as_string: bool, *args, **kwargs):
"""Initialize the SQL to JSON Schema converter."""
super().__init__(*args, **kwargs)
self.dates_as_string = dates_as_string

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict:
"""Override the default mapping for NUMERIC columns.

For example, a scale of 4 translates to a multipleOf 0.0001.
"""
return {
"type": "array",
"items": self.to_jsonschema(column_type.item_type),
}

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict:
"""Override the default mapping for JSON and JSONB columns."""
return {"type": ["string", "number", "integer", "array", "object", "boolean"]}

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def datetime_to_jsonschema(self, column_type: sqlalchemy.types.DateTime) -> dict:
"""Override the default mapping for DATETIME columns."""
if self.dates_as_string:
return {"type": ["string", "null"]}
return super().datetime_to_jsonschema(column_type)

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def date_to_jsonschema(self, column_type: sqlalchemy.types.Date) -> dict:
"""Override the default mapping for DATE columns."""
if self.dates_as_string:
return {"type": ["string", "null"]}
return super().date_to_jsonschema(column_type)


def patched_conform(
Expand Down Expand Up @@ -115,131 +156,10 @@ def __init__(

super().__init__(config=config, sqlalchemy_url=sqlalchemy_url)

# Note super is static, we can get away with this because this is called once
# and is luckily referenced via the instance of the class
def to_jsonschema_type( # type: ignore[override]
self,
sql_type: str | TypeEngine | type[TypeEngine] | postgresql.ARRAY | Any,
) -> dict:
"""Return a JSON Schema representation of the provided type.

Overridden from SQLConnector to correctly handle JSONB and Arrays.

Also Overridden in order to call our instance method `sdk_typing_object()`
instead of the static version

By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.

Args:
sql_type: The string representation of the SQL type, a SQLAlchemy
TypeEngine class or object, or a custom-specified object.

Raises:
ValueError: If the type received could not be translated to jsonschema.

Returns:
The JSON Schema representation of the provided type.

"""
type_name = None
if isinstance(sql_type, str):
type_name = sql_type
elif isinstance(sql_type, sa.types.TypeEngine):
type_name = type(sql_type).__name__

if (
type_name is not None
and isinstance(sql_type, sa.dialects.postgresql.ARRAY)
and type_name == "ARRAY"
):
array_type = self.sdk_typing_object(sql_type.item_type)
return th.ArrayType(array_type).type_dict
return self.sdk_typing_object(sql_type).type_dict

def sdk_typing_object(
self,
from_type: str | TypeEngine | type[TypeEngine],
) -> (
th.DateTimeType
| th.NumberType
| th.IntegerType
| th.DateType
| th.StringType
| th.BooleanType
| th.CustomType
):
"""Return the JSON Schema dict that describes the sql type.

Args:
from_type: The SQL type as a string or as a TypeEngine. If a TypeEngine is
provided, it may be provided as a class or a specific object instance.

Raises:
ValueError: If the `from_type` value is not of type `str` or `TypeEngine`.

Returns:
A compatible JSON Schema type definition.
"""
# NOTE: This is an ordered mapping, with earlier mappings taking precedence. If
# the SQL-provided type contains the type name on the left, the mapping will
# return the respective singer type.
# NOTE: jsonb and json should theoretically be th.AnyType().type_dict but that
# causes problems down the line with an error like:
# singer_sdk.helpers._typing.EmptySchemaTypeError: Could not detect type from
# empty type_dict. Did you forget to define a property in the stream schema?
sqltype_lookup: dict[
str,
th.DateTimeType
| th.NumberType
| th.IntegerType
| th.DateType
| th.StringType
| th.BooleanType
| th.CustomType,
] = {
"jsonb": th.CustomType(
{"type": ["string", "number", "integer", "array", "object", "boolean"]}
),
"json": th.CustomType(
{"type": ["string", "number", "integer", "array", "object", "boolean"]}
),
"timestamp": th.DateTimeType(),
"datetime": th.DateTimeType(),
"date": th.DateType(),
"int": th.IntegerType(),
"numeric": th.NumberType(),
"decimal": th.NumberType(),
"double": th.NumberType(),
"float": th.NumberType(),
"real": th.NumberType(),
"float4": th.NumberType(),
"string": th.StringType(),
"text": th.StringType(),
"char": th.StringType(),
"bool": th.BooleanType(),
"variant": th.StringType(),
}
if self.config["dates_as_string"] is True:
sqltype_lookup["date"] = th.StringType()
sqltype_lookup["datetime"] = th.StringType()
if isinstance(from_type, str):
type_name = from_type
elif isinstance(from_type, sa.types.TypeEngine):
type_name = type(from_type).__name__
elif isinstance(from_type, type) and issubclass(from_type, sa.types.TypeEngine):
type_name = from_type.__name__
else:
raise ValueError(
"Expected `str` or a SQLAlchemy `TypeEngine` object or type."
)

# Look for the type name within the known SQL type names:
for sqltype, jsonschema_type in sqltype_lookup.items():
if sqltype.lower() in type_name.lower():
return jsonschema_type

return sqltype_lookup["string"] # safe failover to str
@functools.cached_property
def sql_to_jsonschema(self):
"""Return a mapping of SQL types to JSON Schema types."""
return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"])

def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
"""Return a list of schema names in DB, or overrides with user-provided values.
Expand Down