Skip to content

Commit

Permalink
add more streams back after testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Oct 22, 2024
1 parent ad8a080 commit 8ebbb0e
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 26 deletions.
67 changes: 62 additions & 5 deletions tap_linkedin_ads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from functools import cached_property
from importlib import resources

from singer_sdk import metrics
from singer_sdk.authenticators import BearerTokenAuthenticator
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 # noqa: TCH002
from singer_sdk.streams import RESTStream
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002

from tap_linkedin_ads.auth import LinkedInAdsOAuthAuthenticator

Expand All @@ -28,6 +28,7 @@ class LinkedInAdsStream(RESTStream):

# Update this value if necessary or override `parse_response`.
records_jsonpath = "$.elements[*]"
path = "/adAccounts"

# Update this value if necessary or override `get_new_paginator`.
next_page_token_jsonpath = "$.metadata.nextPageToken" # noqa: S105
Expand Down Expand Up @@ -85,9 +86,7 @@ def get_url_params(
Returns:
A dictionary of URL query parameters.
"""
params: dict = {
"q": "search",
}
params: dict = {}
if next_page_token:
params["pageToken"] = next_page_token
# if self.replication_key:
Expand All @@ -105,3 +104,61 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
Each record from the source.
"""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def get_unescaped_params(self, context: Context | None) -> dict:
return {}

def request_records(self, context: Context | None) -> t.Iterable[dict]:
"""Request records from REST endpoint(s), returning response records.
If pagination is detected, pages will be recursed automatically.
Args:
context: Stream partition or context dictionary.
Yields:
An item for every record in the response.
"""
paginator = self.get_new_paginator()
decorated_request = self.request_decorator(self._request)
pages = 0

with metrics.http_request_counter(self.name, self.path) as request_counter:
request_counter.context = context

while not paginator.finished:
prepared_request = self.prepare_request(
context,
next_page_token=paginator.current_value,
)
resp = decorated_request(prepared_request, context)
# Patch to add unescaped params to the path and url
if self.get_unescaped_params(context):
prepared_request.url = (
prepared_request.url
+ "?"
+ "&".join(
[
f"{k}={v}"
for k, v in self.get_unescaped_params().items()
],
)
)

request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
records = iter(self.parse_response(resp))
try:
first_record = next(records)
except StopIteration:
self.logger.info(
"Pagination stopped after %d pages because no records were "
"found in the last response",
pages,
)
break
yield first_record
yield from records
pages += 1

paginator.advance(resp)
Loading

0 comments on commit 8ebbb0e

Please sign in to comment.