From 89daa3d076a1b082379500610d51cf7ebc5b79cd Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Fri, 8 Nov 2024 13:27:14 -0500 Subject: [PATCH] fix schema date parsing and init GL stream properly --- tap_intacct/sage.py | 29 ++++++++++++----------------- tap_intacct/streams.py | 9 +++++++-- tap_intacct/tap.py | 23 ++++++++++++++++------- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/tap_intacct/sage.py b/tap_intacct/sage.py index c78e62c..e7f96ad 100644 --- a/tap_intacct/sage.py +++ b/tap_intacct/sage.py @@ -12,6 +12,7 @@ import backoff import requests import xmltodict +from singer_sdk import typing as th from tap_intacct.exceptions import ( AuthFailure, @@ -339,10 +340,7 @@ def load_schema_from_api(self, stream: str): schema_dict """ - schema_dict = {} - schema_dict["type"] = "object" - schema_dict["properties"] = {} - + properties: list[th.Property] = [] required_list = ["RECORDNO", "WHENMODIFIED"] fields_data_response = self.get_fields_data_using_schema_name( object_type=stream @@ -352,22 +350,19 @@ def load_schema_from_api(self, stream: str): if rec["ID"] in IGNORE_FIELDS: continue if rec["DATATYPE"] in ["PERCENT", "DECIMAL"]: - type_data_type = "number" + type_data_type = th.NumberType elif rec["DATATYPE"] == "BOOLEAN": - type_data_type = "boolean" + type_data_type = th.BooleanType elif rec["DATATYPE"] in ["DATE", "TIMESTAMP"]: - type_data_type = "date-time" - else: - type_data_type = "string" - if type_data_type in ["string", "boolean", "number"]: - format_dict = {"type": ["null", type_data_type]} + type_data_type = th.DateTimeType else: - if type_data_type in ["date", "date-time"]: - format_dict = {"type": ["null", "string"], "format": type_data_type} - - schema_dict["properties"][rec["ID"]] = format_dict - schema_dict["required"] = required_list - return schema_dict + type_data_type = th.StringType + properties.append( + th.Property( + rec["ID"], type_data_type, required=(rec["ID"] in required_list) + ) + ) + return th.PropertiesList(*properties).to_dict() def get_client( diff --git a/tap_intacct/streams.py b/tap_intacct/streams.py index 50bb961..62820e3 100644 --- a/tap_intacct/streams.py +++ b/tap_intacct/streams.py @@ -65,7 +65,7 @@ def __init__( self.datetime_fields = [ i for i, t in self.schema["properties"].items() - if t.get("format") == "date-time" + if t.get("format", "") == "date-time" ] def _get_session_id(self) -> str: @@ -354,7 +354,12 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]: remaining = api_response["result"]["data"].get("@numremaining", None) if total and remaining: progress = int(total) - int(remaining) + # TODO: this is showing 0/0 on the last iteration, paginator should skip last request self.logger.info(f"{progress} of {total} records processed") + if self.intacct_obj_name not in api_response["result"]["data"]: + # TODO: use module name instead of object name for GL + self.logger.info(f"Pagination complete for {self.intacct_obj_name}") + return [] return api_response["result"]["data"][self.intacct_obj_name] self.logger.error(f"Intacct error response: {api_response}") @@ -466,7 +471,7 @@ def __init__( **kwargs, ): # Add MODULEKEY to discovered schema so it can be manually added in post_process - kwargs["schema"]["properties"]["MODULEKEY"] = th.StringType + kwargs["schema"]["properties"]["MODULEKEY"] = th.StringType().to_dict() super().__init__(*args, **kwargs) def _get_query_filter( diff --git a/tap_intacct/tap.py b/tap_intacct/tap.py index 96e8e03..ff9119e 100644 --- a/tap_intacct/tap.py +++ b/tap_intacct/tap.py @@ -74,13 +74,22 @@ def discover_streams(self) -> list[streams.TableStream]: ) for stream_name in INTACCT_OBJECTS: schema = sage_client.load_schema_from_api(stream_name) - stream = streams.IntacctStream( - tap=self, - name=stream_name, - schema=schema, - intacct_obj_name=INTACCT_OBJECTS[stream_name], - replication_key="WHENMODIFIED", - ) + if stream_name == "general_ledger_details": + stream = streams.GeneralLedgerDetailsStream( + tap=self, + name=stream_name, + schema=schema, + intacct_obj_name=INTACCT_OBJECTS[stream_name], + replication_key="WHENMODIFIED", + ) + else: + stream = streams.IntacctStream( + tap=self, + name=stream_name, + schema=schema, + intacct_obj_name=INTACCT_OBJECTS[stream_name], + replication_key="WHENMODIFIED", + ) discovered_streams.append(stream) # audit_stream = streams.SageStream( # tap=self,