From 62d4e5a5075ce0a7cc41d42ea50da4b1ee4b1e3a Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Wed, 23 Oct 2024 17:18:16 -0400 Subject: [PATCH] video ads and fix pks --- tap_linkedin_ads/streams.py | 99 ++++++++++++++++++++++++++++++++++--- tap_linkedin_ads/tap.py | 1 + 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 0c82144..1191dd4 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -26,7 +26,7 @@ class AccountsStream(LinkedInAdsStream): """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-accounts#search-for-accounts.""" name = "accounts" - primary_keys = ["last_modified_time", "id", "status"] + primary_keys: t.ClassVar[list[str]] = ["id"] schema = PropertiesList( Property( @@ -82,7 +82,10 @@ class AccountsStream(LinkedInAdsStream): def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: """Return a context dictionary for a child stream.""" - return {"account_id": record["id"]} + return { + "account_id": record["id"], + "owner_urn": record["reference"], + } def get_url_params( self, @@ -110,7 +113,7 @@ class AccountUsersStream(LinkedInAdsStream): name = "account_users" parent_stream_type = AccountsStream - primary_keys = ["user_person_id", "last_modified_time"] + primary_keys: t.ClassVar[list[str]] = ["account"] path = "/adAccountUsers" schema = PropertiesList( @@ -168,7 +171,7 @@ class CampaignsStream(LinkedInAdsStream): """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-campaigns#search-for-campaigns.""" name = "campaigns" - primary_keys = ["last_modified_time", "id", "status"] + primary_keys: t.ClassVar[list[str]] = ["id"] parent_stream_type = AccountsStream next_page_token_jsonpath = ( "$.metadata.nextPageToken" # Or override `get_next_page_token`. # noqa: S105 @@ -463,7 +466,7 @@ class CampaignGroupsStream(LinkedInAdsStream): name = "campaign_groups" parent_stream_type = AccountsStream - primary_keys = ["last_modified_time", "id", "status"] + primary_keys: t.ClassVar[list[str]] = ["id"] schema = PropertiesList( Property( @@ -555,7 +558,7 @@ class CreativesStream(LinkedInAdsStream): name = "creatives" parent_stream_type = AccountsStream - primary_keys = ["lastModifiedAt", "id"] + primary_keys: t.ClassVar[list[str]] = ["id"] schema = PropertiesList( Property("account", StringType), @@ -623,3 +626,87 @@ def get_url_params( "q": "criteria", **super().get_url_params(context, next_page_token), } + + +class VideoAdsStream(LinkedInAdsStream): + """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders.""" + + name = "video_ads" + path = "/adDirectSponsoredContents" + parent_stream_type = AccountsStream + + schema = PropertiesList( + Property("account", StringType), + Property("account_id", IntegerType), + Property( + "changeAuditStamps", + ObjectType( + Property( + "created", + ObjectType( + Property("time", IntegerType), + additional_properties=False, + ), + ), + Property( + "lastModified", + ObjectType( + Property("time", IntegerType), + additional_properties=False, + ), + ), + ), + ), + Property("created_time", StringType), + Property("last_modified_time", StringType), + Property("content_reference", StringType), + Property("content_reference_ucg_post_id", IntegerType), + Property("content_reference_share_id", IntegerType), + Property("name", StringType), + Property("type", StringType), + ).to_dict() + + @property + def url_base(self) -> str: + """Return the API URL root, configurable via tap settings.""" + return "https://api.linkedin.com/v2" + + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "account", + "account": f"urn:li:sponsoredAccount:{context['account_id']}", + "owner": context["owner_urn"], + **super().get_url_params(context, next_page_token), + } + + # TODO: handle timestamp parsing more generically + def post_process(self, row: dict, context: dict | None = None) -> dict | None: + # This function extracts day, month, and year from date range column + # These values are parse with datetime function and the date is added to the day column + # with contextlib.suppress(Exception): + # created_time = row.get("changeAuditStamps", {}).get("created", {}).get("time") + # last_modified_time = ( + # row.get("changeAuditStamps", {}).get("lastModified", {}).get("time") + # ) + # row["created_time"] = datetime.fromtimestamp( + # int(created_time) / 1000, + # tz=UTC, + # ).isoformat() + # row["last_modified_time"] = datetime.fromtimestamp( + # int(last_modified_time) / 1000, + # tz=UTC, + # ).isoformat() + return super().post_process(row, context) diff --git a/tap_linkedin_ads/tap.py b/tap_linkedin_ads/tap.py index acd5aa7..3d8cf78 100644 --- a/tap_linkedin_ads/tap.py +++ b/tap_linkedin_ads/tap.py @@ -81,6 +81,7 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]: streams.CampaignsStream(self), streams.CampaignGroupsStream(self), streams.CreativesStream(self), + streams.VideoAdsStream(self), ]