diff --git a/tap_linkedin_ads/client.py b/tap_linkedin_ads/client.py index dba3f7c..787279e 100644 --- a/tap_linkedin_ads/client.py +++ b/tap_linkedin_ads/client.py @@ -6,11 +6,11 @@ from functools import cached_property from importlib import resources +from singer_sdk import metrics from singer_sdk.authenticators import BearerTokenAuthenticator from singer_sdk.helpers.jsonpath import extract_jsonpath -from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 +from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 # noqa: TCH002 from singer_sdk.streams import RESTStream -from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 from tap_linkedin_ads.auth import LinkedInAdsOAuthAuthenticator @@ -28,6 +28,7 @@ class LinkedInAdsStream(RESTStream): # Update this value if necessary or override `parse_response`. records_jsonpath = "$.elements[*]" + path = "/adAccounts" # Update this value if necessary or override `get_new_paginator`. next_page_token_jsonpath = "$.metadata.nextPageToken" # noqa: S105 @@ -85,9 +86,7 @@ def get_url_params( Returns: A dictionary of URL query parameters. """ - params: dict = { - "q": "search", - } + params: dict = {} if next_page_token: params["pageToken"] = next_page_token # if self.replication_key: @@ -105,3 +104,61 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]: Each record from the source. """ yield from extract_jsonpath(self.records_jsonpath, input=response.json()) + + def get_unescaped_params(self, context: Context | None) -> dict: + return {} + + def request_records(self, context: Context | None) -> t.Iterable[dict]: + """Request records from REST endpoint(s), returning response records. + + If pagination is detected, pages will be recursed automatically. + + Args: + context: Stream partition or context dictionary. + + Yields: + An item for every record in the response. + """ + paginator = self.get_new_paginator() + decorated_request = self.request_decorator(self._request) + pages = 0 + + with metrics.http_request_counter(self.name, self.path) as request_counter: + request_counter.context = context + + while not paginator.finished: + prepared_request = self.prepare_request( + context, + next_page_token=paginator.current_value, + ) + resp = decorated_request(prepared_request, context) + # Patch to add unescaped params to the path and url + if self.get_unescaped_params(context): + prepared_request.url = ( + prepared_request.url + + "?" + + "&".join( + [ + f"{k}={v}" + for k, v in self.get_unescaped_params().items() + ], + ) + ) + + request_counter.increment() + self.update_sync_costs(prepared_request, resp, context) + records = iter(self.parse_response(resp)) + try: + first_record = next(records) + except StopIteration: + self.logger.info( + "Pagination stopped after %d pages because no records were " + "found in the last response", + pages, + ) + break + yield first_record + yield from records + pages += 1 + + paginator.advance(resp) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 66395b1..0c82144 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -5,6 +5,7 @@ import typing as t from importlib import resources +from singer_sdk.helpers.types import Context from singer_sdk.typing import ( ArrayType, BooleanType, @@ -26,7 +27,6 @@ class AccountsStream(LinkedInAdsStream): name = "accounts" primary_keys = ["last_modified_time", "id", "status"] - path = "/adAccounts" schema = PropertiesList( Property( @@ -80,6 +80,10 @@ class AccountsStream(LinkedInAdsStream): ), ).to_dict() + def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + """Return a context dictionary for a child stream.""" + return {"account_id": record["id"]} + def get_url_params( self, context: dict | None, # noqa: ARG002 @@ -94,35 +98,81 @@ def get_url_params( Returns: A dictionary of URL query parameters. """ - params = super().get_url_params(context, next_page_token) + return { + "q": "search", + "sortOrder": "ASCENDING", + **super().get_url_params(context, next_page_token), + } - params["q"] = "search" - params["sortOrder"] = "ASCENDING" - return params - def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: - """Return a context dictionary for a child stream.""" - return {"account_id": record["id"]} +class AccountUsersStream(LinkedInAdsStream): + """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-account-users#find-ad-account-users-by-accounts.""" + + name = "account_users" + parent_stream_type = AccountsStream + primary_keys = ["user_person_id", "last_modified_time"] + path = "/adAccountUsers" + + schema = PropertiesList( + Property("account", StringType), + Property("campaign_contact", BooleanType), + Property("account_id", IntegerType), + Property( + "changeAuditStamps", + ObjectType( + Property( + "created", + ObjectType( + Property("time", IntegerType), + additional_properties=False, + ), + ), + Property( + "lastModified", + ObjectType( + Property("time", IntegerType), + additional_properties=False, + ), + ), + ), + ), + Property("created_time", StringType), + Property("last_modified_time", StringType), + Property("role", StringType), + Property("user", StringType), + Property("user_person_id", StringType), + ).to_dict() + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "accounts", + "accounts": f"urn:li:sponsoredAccount:{context['account_id']}", + **super().get_url_params(context, next_page_token), + } class CampaignsStream(LinkedInAdsStream): """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-campaigns#search-for-campaigns.""" - """ - columns: columns which will be added to fields parameter in api - name: stream name - path: path which will be added to api url in client.py - schema: instream schema - primary_keys = primary keys for the table - replication_keys = datetime keys for replication - """ - name = "campaigns" - path = "/adAccounts" primary_keys = ["last_modified_time", "id", "status"] parent_stream_type = AccountsStream - next_page_token_jsonpath = "$.metadata.nextPageToken" # Or override `get_next_page_token`. # noqa: S105 + next_page_token_jsonpath = ( + "$.metadata.nextPageToken" # Or override `get_next_page_token`. # noqa: S105 + ) schema = PropertiesList( Property("storyDeliveryEnabled", BooleanType), Property( @@ -380,10 +430,196 @@ def get_url(self, context: dict | None) -> str: Returns: A URL, optionally targeted to a specific partition or context. """ - self.logger.info(context["account_id"]) return super().get_url(context) + f'/{context["account_id"]}/adCampaigns' - def get_unescaped_params(self): + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "search", + "sortOrder": "ASCENDING", + **super().get_url_params(context, next_page_token), + } + + def get_unescaped_params(self, context: Context | None) -> dict: return { "search": "(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED,CANCELED,DRAFT,PENDING_DELETION,REMOVED)))" } + + +class CampaignGroupsStream(LinkedInAdsStream): + """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-campaign-groups#search-for-campaign-groups.""" + + name = "campaign_groups" + parent_stream_type = AccountsStream + primary_keys = ["last_modified_time", "id", "status"] + + schema = PropertiesList( + Property( + "runSchedule", + ObjectType(Property("start", IntegerType), Property("end", IntegerType)), + ), + Property( + "changeAuditStamps", + ObjectType( + Property( + "created", + ObjectType( + Property("time", IntegerType), + additional_properties=False, + ), + ), + Property( + "lastModified", + ObjectType( + Property("time", IntegerType), + additional_properties=False, + ), + ), + ), + ), + Property("created_time", DateTimeType), + Property("last_modified_time", DateTimeType), + Property("name", StringType), + Property("servingStatuses", ArrayType(StringType)), + Property("backfilled", BooleanType), + Property("id", IntegerType), + Property("account", StringType), + Property("account_id", IntegerType), + Property("status", StringType), + Property( + "total_budget", + ObjectType( + Property("currency_code", StringType), + Property("amount", StringType), + ), + ), + Property("test", BooleanType), + Property("allowed_campaign_types", ArrayType(StringType)), + Property("run_schedule_start", DateTimeType), + Property("run_schedule_end", StringType), + ).to_dict() + + def get_url(self, context: dict | None) -> str: + """Get stream entity URL. + + Developers override this method to perform dynamic URL generation. + + Args: + context: Stream partition or context dictionary. + + Returns: + A URL, optionally targeted to a specific partition or context. + """ + return super().get_url(context) + f'/{context["account_id"]}/adCampaignGroups' + + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "search", + "sortOrder": "ASCENDING", + **super().get_url_params(context, next_page_token), + } + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + "search": "(status:(values:List(ACTIVE,ARCHIVED,CANCELED,DRAFT,PAUSED,PENDING_DELETION,REMOVED)))" + } + + +class CreativesStream(LinkedInAdsStream): + """https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-05&tabs=http%2Chttp-update-a-creative#search-for-creatives.""" + + name = "creatives" + parent_stream_type = AccountsStream + primary_keys = ["lastModifiedAt", "id"] + + schema = PropertiesList( + Property("account", StringType), + Property("account_id", IntegerType), + Property("campaign", StringType), + Property("campaign_id", StringType), + Property( + "content", + ObjectType( + Property( + "spotlight", + ObjectType( + Property("showMemberProfilePhoto", BooleanType), + Property("organizationName", StringType), + Property("landingPage", StringType), + Property("description", StringType), + Property("logo", StringType), + Property("headline", StringType), + Property("callToAction", StringType), + additional_properties=False, + ), + ), + ), + ), + Property("createdAt", IntegerType), + Property("createdBy", StringType), + Property("lastModifiedAt", IntegerType), + Property("lastModifiedBy", StringType), + Property("id", StringType), + Property("intendedStatus", StringType), + Property("isServing", BooleanType), + Property("isTest", BooleanType), + Property("servingHoldReasons", ArrayType(Property("items", StringType))), + ).to_dict() + + def get_url(self, context: dict | None) -> str: + """Get stream entity URL. + + Developers override this method to perform dynamic URL generation. + + Args: + context: Stream partition or context dictionary. + + Returns: + A URL, optionally targeted to a specific partition or context. + """ + # TODO: optional filter 'urn%3Ali%3AsponsoredCreative%3A{self.config["creative"]}' + return super().get_url(context) + f'/{context["account_id"]}/creatives' + + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "criteria", + **super().get_url_params(context, next_page_token), + } diff --git a/tap_linkedin_ads/tap.py b/tap_linkedin_ads/tap.py index 5ade6f0..acd5aa7 100644 --- a/tap_linkedin_ads/tap.py +++ b/tap_linkedin_ads/tap.py @@ -1,6 +1,7 @@ """LinkedInAds tap class.""" from __future__ import annotations + import datetime from singer_sdk import Tap @@ -10,6 +11,7 @@ NOW = datetime.datetime.now(tz=datetime.timezone.utc) + class TapLinkedInAds(Tap): """LinkedInAds tap class.""" @@ -75,7 +77,10 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]: """ return [ streams.AccountsStream(self), + streams.AccountUsersStream(self), streams.CampaignsStream(self), + streams.CampaignGroupsStream(self), + streams.CreativesStream(self), ]