Skip to content

Commit

Permalink
fix schema date parsing and init GL stream properly
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Nov 8, 2024
1 parent e187539 commit 89daa3d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 26 deletions.
29 changes: 12 additions & 17 deletions tap_intacct/sage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import backoff
import requests
import xmltodict
from singer_sdk import typing as th

from tap_intacct.exceptions import (
AuthFailure,
Expand Down Expand Up @@ -339,10 +340,7 @@ def load_schema_from_api(self, stream: str):
schema_dict
"""
schema_dict = {}
schema_dict["type"] = "object"
schema_dict["properties"] = {}

properties: list[th.Property] = []
required_list = ["RECORDNO", "WHENMODIFIED"]
fields_data_response = self.get_fields_data_using_schema_name(
object_type=stream
Expand All @@ -352,22 +350,19 @@ def load_schema_from_api(self, stream: str):
if rec["ID"] in IGNORE_FIELDS:
continue
if rec["DATATYPE"] in ["PERCENT", "DECIMAL"]:
type_data_type = "number"
type_data_type = th.NumberType
elif rec["DATATYPE"] == "BOOLEAN":
type_data_type = "boolean"
type_data_type = th.BooleanType
elif rec["DATATYPE"] in ["DATE", "TIMESTAMP"]:
type_data_type = "date-time"
else:
type_data_type = "string"
if type_data_type in ["string", "boolean", "number"]:
format_dict = {"type": ["null", type_data_type]}
type_data_type = th.DateTimeType
else:
if type_data_type in ["date", "date-time"]:
format_dict = {"type": ["null", "string"], "format": type_data_type}

schema_dict["properties"][rec["ID"]] = format_dict
schema_dict["required"] = required_list
return schema_dict
type_data_type = th.StringType
properties.append(
th.Property(
rec["ID"], type_data_type, required=(rec["ID"] in required_list)
)
)
return th.PropertiesList(*properties).to_dict()


def get_client(
Expand Down
9 changes: 7 additions & 2 deletions tap_intacct/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(
self.datetime_fields = [
i
for i, t in self.schema["properties"].items()
if t.get("format") == "date-time"
if t.get("format", "") == "date-time"
]

def _get_session_id(self) -> str:
Expand Down Expand Up @@ -354,7 +354,12 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
remaining = api_response["result"]["data"].get("@numremaining", None)
if total and remaining:
progress = int(total) - int(remaining)
# TODO: this is showing 0/0 on the last iteration, paginator should skip last request
self.logger.info(f"{progress} of {total} records processed")
if self.intacct_obj_name not in api_response["result"]["data"]:
# TODO: use module name instead of object name for GL
self.logger.info(f"Pagination complete for {self.intacct_obj_name}")
return []
return api_response["result"]["data"][self.intacct_obj_name]

self.logger.error(f"Intacct error response: {api_response}")
Expand Down Expand Up @@ -466,7 +471,7 @@ def __init__(
**kwargs,
):
# Add MODULEKEY to discovered schema so it can be manually added in post_process
kwargs["schema"]["properties"]["MODULEKEY"] = th.StringType
kwargs["schema"]["properties"]["MODULEKEY"] = th.StringType().to_dict()
super().__init__(*args, **kwargs)

def _get_query_filter(
Expand Down
23 changes: 16 additions & 7 deletions tap_intacct/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,22 @@ def discover_streams(self) -> list[streams.TableStream]:
)
for stream_name in INTACCT_OBJECTS:
schema = sage_client.load_schema_from_api(stream_name)
stream = streams.IntacctStream(
tap=self,
name=stream_name,
schema=schema,
intacct_obj_name=INTACCT_OBJECTS[stream_name],
replication_key="WHENMODIFIED",
)
if stream_name == "general_ledger_details":
stream = streams.GeneralLedgerDetailsStream(
tap=self,
name=stream_name,
schema=schema,
intacct_obj_name=INTACCT_OBJECTS[stream_name],
replication_key="WHENMODIFIED",
)
else:
stream = streams.IntacctStream(
tap=self,
name=stream_name,
schema=schema,
intacct_obj_name=INTACCT_OBJECTS[stream_name],
replication_key="WHENMODIFIED",
)
discovered_streams.append(stream)
# audit_stream = streams.SageStream(
# tap=self,
Expand Down

0 comments on commit 89daa3d

Please sign in to comment.