Skip to content

Commit

Permalink
remove sage client from streams
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Nov 8, 2024
1 parent 4e881c7 commit e187539
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
62 changes: 55 additions & 7 deletions tap_intacct/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime, timezone
from urllib.parse import unquote

import requests
import xmltodict
from singer_sdk import typing as th # JSON schema typing helpers
from singer_sdk.pagination import (
Expand Down Expand Up @@ -54,21 +55,67 @@ def __init__(
*args,
intacct_obj_name=None,
replication_key=None,
sage_client=None,
**kwargs,
):
super().__init__(*args, **kwargs)
self.primary_key = KEY_PROPERTIES[self.name]
self.intacct_obj_name = intacct_obj_name
self.replication_key = replication_key
self.sage_client = sage_client
self.session_id = sage_client._SageIntacctSDK__session_id
self.session_id = self._get_session_id()
self.datetime_fields = [
i
for i, t in self.schema["properties"].items()
if t.get("format") == "date-time"
]

def _get_session_id(self) -> str:
timestamp = datetime.now(timezone.utc)
dict_body = {
"request": {
"control": {
"senderid": self.config["sender_id"],
"password": self.config["sender_password"],
"controlid": timestamp,
"uniqueid": False,
"dtdversion": 3.0,
"includewhitespace": False,
},
"operation": {
"authentication": {
"login": {
"userid": self.config["user_id"],
"companyid": self.config["company_id"],
"password": self.config["user_password"],
}
},
"content": {
"function": {
"@controlid": str(uuid.uuid4()),
"getAPISession": None,
}
},
},
}
}
payload_data = xmltodict.unparse(dict_body)
response = requests.post(
self.url_base,
headers=self.http_headers,
data=payload_data,
timeout=30,
)
parsed_xml = xmltodict.parse(response.text)
parsed_response = json.loads(json.dumps(parsed_xml))
if (
parsed_response["response"]["control"]["status"] == "success"
and parsed_response["response"]["operation"]["authentication"]["status"]
== "success"
):
return parsed_response["response"]["operation"]["result"]["data"]["api"][
"sessionid"
]
raise SageIntacctSDKError("Error: {0}".format(parsed_response["errormessage"]))

@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
Expand Down Expand Up @@ -303,10 +350,11 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
)

if api_response["result"]["status"] == "success":
total = int(api_response["result"]["data"]["@totalcount"])
remaining = int(api_response["result"]["data"]["@numremaining"])
progress = total - remaining
self.logger.info(f"{progress} of {total} records processed")
total = api_response["result"]["data"].get("@totalcount", None)
remaining = api_response["result"]["data"].get("@numremaining", None)
if total and remaining:
progress = int(total) - int(remaining)
self.logger.info(f"{progress} of {total} records processed")
return api_response["result"]["data"][self.intacct_obj_name]

self.logger.error(f"Intacct error response: {api_response}")
Expand Down
1 change: 0 additions & 1 deletion tap_intacct/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def discover_streams(self) -> list[streams.TableStream]:
schema=schema,
intacct_obj_name=INTACCT_OBJECTS[stream_name],
replication_key="WHENMODIFIED",
sage_client=sage_client,
)
discovered_streams.append(stream)
# audit_stream = streams.SageStream(
Expand Down

0 comments on commit e187539

Please sign in to comment.