Skip to content

Commit

Permalink
pagination cleanup, sorted flag, simplifiy context val parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Nov 8, 2024
1 parent 89daa3d commit 5f46ce3
Showing 1 changed file with 58 additions and 40 deletions.
98 changes: 58 additions & 40 deletions tap_intacct/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@
PAGE_SIZE = 1000


class IntacctOffsetPaginator(BaseOffsetPaginator):
def __init__(
self,
*args,
logger=None,
**kwargs,
):
self.logger = logger
super().__init__(*args, **kwargs)

def has_more(self, response: requests.Response) -> bool: # noqa: ARG002, PLR6301
"""Override this method to check if the endpoint has any pages left.
Args:
response: API response object.
Returns:
Boolean flag used to indicate if the endpoint has more pages.
"""
parsed_xml = xmltodict.parse(response.text)
parsed_response = json.loads(json.dumps(parsed_xml))
total = parsed_response["response"]["operation"]["result"]["data"].get(
"@totalcount", 0
)
remaining = parsed_response["response"]["operation"]["result"]["data"].get(
"@numremaining", 0
)
progress = int(total) - int(remaining)
self.logger.info(f"{progress} of {total} records processed")
return int(remaining) > 0


class IntacctStream(RESTStream):
"""Intacct stream class."""

Expand All @@ -68,6 +100,18 @@ def __init__(
if t.get("format", "") == "date-time"
]

@property
def is_sorted(self) -> bool:
"""Expect stream to be sorted.
When `True`, incremental streams will attempt to resume if unexpectedly
interrupted.
Returns:
`True` if stream is sorted. Defaults to `False`.
"""
return True

def _get_session_id(self) -> str:
timestamp = datetime.now(timezone.utc)
dict_body = {
Expand Down Expand Up @@ -148,9 +192,10 @@ def get_new_paginator(self) -> BaseAPIPaginator:
Returns:
A pagination helper instance.
"""
return BaseOffsetPaginator(
return IntacctOffsetPaginator(
start_value=0,
page_size=PAGE_SIZE,
logger=self.logger,
)

def _format_date_for_intacct(self, datetime: datetime) -> str:
Expand Down Expand Up @@ -350,17 +395,7 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
)

if api_response["result"]["status"] == "success":
total = api_response["result"]["data"].get("@totalcount", None)
remaining = api_response["result"]["data"].get("@numremaining", None)
if total and remaining:
progress = int(total) - int(remaining)
# TODO: this is showing 0/0 on the last iteration, paginator should skip last request
self.logger.info(f"{progress} of {total} records processed")
if self.intacct_obj_name not in api_response["result"]["data"]:
# TODO: use module name instead of object name for GL
self.logger.info(f"Pagination complete for {self.intacct_obj_name}")
return []
return api_response["result"]["data"][self.intacct_obj_name]
return api_response["result"]["data"].get(self.intacct_obj_name, [])

self.logger.error(f"Intacct error response: {api_response}")
error = (
Expand Down Expand Up @@ -484,7 +519,7 @@ def _get_query_filter(
**super()._get_query_filter(rep_key, context),
"equalto": {
"field": "MODULEKEY",
"value": context["key"],
"value": context["MODULEKEY"],
},
}
}
Expand All @@ -502,31 +537,14 @@ def partitions(self) -> list[dict] | None:
A list of partition key dicts (if applicable), otherwise `None`.
"""
return [
{"key": "2.GL", "name": "General Ledger"},
{"key": "3.AP", "name": "Accounts Payable"},
{"key": "4.AR", "name": "Accounts Receivable"},
{"key": "6.EE", "name": "Employee Expenses"},
{"key": "7.INV", "name": "Inventory Control"},
{"key": "8.SO", "name": "Order Entry"},
{"key": "9.PO", "name": "Purchasing"},
{"key": "11.CM", "name": "Cash Management"},
{"key": "48.PROJACCT", "name": "Project and Resource Management"},
{"key": "55.CONTRACT", "name": "Contracts and Revenue Management"},
{"MODULEKEY": "2.GL", "name": "General Ledger"},
{"MODULEKEY": "3.AP", "name": "Accounts Payable"},
{"MODULEKEY": "4.AR", "name": "Accounts Receivable"},
{"MODULEKEY": "6.EE", "name": "Employee Expenses"},
{"MODULEKEY": "7.INV", "name": "Inventory Control"},
{"MODULEKEY": "8.SO", "name": "Order Entry"},
{"MODULEKEY": "9.PO", "name": "Purchasing"},
{"MODULEKEY": "11.CM", "name": "Cash Management"},
{"MODULEKEY": "48.PROJACCT", "name": "Project and Resource Management"},
{"MODULEKEY": "55.CONTRACT", "name": "Contracts and Revenue Management"},
]

def post_process(
self,
row: dict,
context: Context | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Args:
row: An individual record from the stream.
context: The stream context.
Returns:
The updated record dictionary, or ``None`` to skip the record.
"""
row["MODULEKEY"] = context["key"]
return row

0 comments on commit 5f46ce3

Please sign in to comment.