From aa28cbd46c62d4801d95fc6885559e427faede77 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Wed, 23 Oct 2024 18:27:34 -0400 Subject: [PATCH] refactor unescaped params and add first analytics stream --- tap_linkedin_ads/client.py | 7 +- tap_linkedin_ads/streams.py | 267 ++++++++++++++++++++++++++++++++++++ tap_linkedin_ads/tap.py | 1 + 3 files changed, 271 insertions(+), 4 deletions(-) diff --git a/tap_linkedin_ads/client.py b/tap_linkedin_ads/client.py index 787279e..fb9042a 100644 --- a/tap_linkedin_ads/client.py +++ b/tap_linkedin_ads/client.py @@ -131,20 +131,19 @@ def request_records(self, context: Context | None) -> t.Iterable[dict]: context, next_page_token=paginator.current_value, ) - resp = decorated_request(prepared_request, context) # Patch to add unescaped params to the path and url if self.get_unescaped_params(context): prepared_request.url = ( prepared_request.url - + "?" + + "&" + "&".join( [ f"{k}={v}" - for k, v in self.get_unescaped_params().items() + for k, v in self.get_unescaped_params(context).items() ], ) ) - + resp = decorated_request(prepared_request, context) request_counter.increment() self.update_sync_costs(prepared_request, resp, context) records = iter(self.parse_response(resp)) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 1191dd4..976d5e0 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -2,9 +2,12 @@ from __future__ import annotations +import contextlib import typing as t +from datetime import datetime, timezone from importlib import resources +import pendulum from singer_sdk.helpers.types import Context from singer_sdk.typing import ( ArrayType, @@ -20,6 +23,7 @@ from tap_linkedin_ads.client import LinkedInAdsStream SCHEMAS_DIR = resources.files(__package__) / "schemas" +UTC = timezone.utc class AccountsStream(LinkedInAdsStream): @@ -460,6 +464,12 @@ def get_unescaped_params(self, context: Context | None) -> dict: "search": "(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED,CANCELED,DRAFT,PENDING_DELETION,REMOVED)))" } + def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + """Return a context dictionary for a child stream.""" + return { + "campaign_id": record["id"], + } + class CampaignGroupsStream(LinkedInAdsStream): """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-campaign-groups#search-for-campaign-groups.""" @@ -710,3 +720,260 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None: # tz=UTC, # ).isoformat() return super().post_process(row, context) + + +class AdAnalyticsByCampaignInit(LinkedInAdsStream): + """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting#analytics-finder.""" + + name = "AdAnalyticsByCampaignInit" + path = "/adAnalytics" + parent_stream_type = CampaignsStream + + schema = PropertiesList( + Property("campaign_id", StringType), + Property("documentCompletions", IntegerType), + Property("documentFirstQuartileCompletions", IntegerType), + Property("clicks", IntegerType), + Property("documentMidpointCompletions", IntegerType), + Property("documentThirdQuartileCompletions", IntegerType), + Property("downloadClicks", IntegerType), + Property("jobApplications", StringType), + Property("jobApplyClicks", StringType), + Property("postViewJobApplications", StringType), + Property("costInUsd", StringType), + Property("postViewRegistrations", StringType), + Property("registrations", StringType), + Property("talentLeads", IntegerType), + Property("viralDocumentCompletions", IntegerType), + Property("viralDocumentFirstQuartileCompletions", IntegerType), + Property("viralDocumentMidpointCompletions", IntegerType), + Property("viralDocumentThirdQuartileCompletions", IntegerType), + Property("viralDownloadClicks", IntegerType), + Property("viralJobApplications", StringType), + Property("viralJobApplyClicks", StringType), + Property("costInLocalCurrency", StringType), + Property("viralRegistrations", StringType), + Property("approximateUniqueImpressions", IntegerType), + Property("cardClicks", IntegerType), + Property("cardImpressions", IntegerType), + Property("commentLikes", IntegerType), + Property("viralCardClicks", IntegerType), + Property("viralCardImpressions", IntegerType), + Property("viralCommentLikes", IntegerType), + Property("actionClicks", IntegerType), + Property("adUnitClicks", IntegerType), + Property("comments", IntegerType), + Property("companyPageClicks", IntegerType), + Property("conversionValueInLocalCurrency", StringType), + Property( + "dateRange", + ObjectType( + Property( + "end", + ObjectType( + Property("day", IntegerType), + Property("month", IntegerType), + Property("year", IntegerType), + additional_properties=False, + ), + ), + Property( + "start", + ObjectType( + Property("day", IntegerType), + Property("month", IntegerType), + Property("year", IntegerType), + additional_properties=False, + ), + ), + ), + ), + Property("day", StringType), + Property("externalWebsiteConversions", IntegerType), + Property("externalWebsitePostClickConversions", IntegerType), + Property("externalWebsitePostViewConversions", IntegerType), + Property("follows", IntegerType), + Property("fullScreenPlays", IntegerType), + Property("impressions", IntegerType), + Property("landingPageClicks", IntegerType), + Property("leadGenerationMailContactInfoShares", IntegerType), + Property("leadGenerationMailInterestedClicks", IntegerType), + Property("likes", IntegerType), + Property("oneClickLeadFormOpens", IntegerType), + Property("oneClickLeads", IntegerType), + Property("opens", IntegerType), + Property("otherEngagements", IntegerType), + Property("sends", IntegerType), + Property("shares", IntegerType), + Property("textUrlClicks", IntegerType), + Property("totalEngagements", IntegerType), + Property("videoCompletions", IntegerType), + Property("videoFirstQuartileCompletions", IntegerType), + Property("videoMidpointCompletions", IntegerType), + Property("videoStarts", IntegerType), + Property("videoThirdQuartileCompletions", IntegerType), + Property("videoViews", IntegerType), + Property("viralClicks", IntegerType), + Property("viralComments", IntegerType), + Property("viralCompanyPageClicks", IntegerType), + Property("viralExternalWebsiteConversions", IntegerType), + Property("viralExternalWebsitePostClickConversions", IntegerType), + Property("viralExternalWebsitePostViewConversions", IntegerType), + Property("viralFollows", IntegerType), + Property("viralFullScreenPlays", IntegerType), + Property("viralImpressions", IntegerType), + Property("viralLandingPageClicks", IntegerType), + Property("viralLikes", IntegerType), + Property("viralOneClickLeadFormOpens", IntegerType), + Property("viralOneclickLeads", IntegerType), + Property("viralOtherEngagements", IntegerType), + Property("viralReactions", IntegerType), + Property("reactions", IntegerType), + Property("viralShares", IntegerType), + Property("viralTotalEngagements", IntegerType), + Property("viralVideoCompletions", IntegerType), + Property("viralVideoFirstQuartileCompletions", IntegerType), + Property("viralVideoMidpointCompletions", IntegerType), + Property("viralVideoStarts", IntegerType), + Property("viralVideoThirdQuartileCompletions", IntegerType), + Property("viralVideoViews", IntegerType), + ).to_dict() + + @property + def adanalyticscolumns(self) -> list[str]: + return [ + "viralLandingPageClicks,viralExternalWebsitePostClickConversions,externalWebsiteConversions,viralVideoFirstQuartileCompletions,leadGenerationMailContactInfoShares,clicks,viralClicks,shares,viralFullScreenPlays,videoMidpointCompletions,viralCardClicks,viralExternalWebsitePostViewConversions,viralTotalEngagements,viralCompanyPageClicks,actionClicks,viralShares,videoCompletions,comments,externalWebsitePostViewConversions,dateRange", + "costInUsd,landingPageClicks,oneClickLeadFormOpens,talentLeads,sends,viralOneClickLeadFormOpens,conversionValueInLocalCurrency,viralFollows,otherEngagements,viralVideoCompletions,cardImpressions,leadGenerationMailInterestedClicks,opens,totalEngagements,videoViews,viralImpressions,viralVideoViews,commentLikes,viralDocumentThirdQuartileCompletions,viralLikes", + "adUnitClicks,videoThirdQuartileCompletions,cardClicks,likes,viralComments,viralVideoMidpointCompletions,viralVideoThirdQuartileCompletions,oneClickLeads,fullScreenPlays,viralCardImpressions,follows,videoStarts,videoFirstQuartileCompletions,textUrlClicks,reactions,viralReactions,externalWebsitePostClickConversions,viralOtherEngagements,costInLocalCurrency", + "viralVideoStarts,viralRegistrations,viralJobApplyClicks,viralJobApplications,jobApplications,jobApplyClicks,viralExternalWebsiteConversions,postViewRegistrations,companyPageClicks,documentCompletions,documentFirstQuartileCompletions,documentMidpointCompletions,documentThirdQuartileCompletions,downloadClicks,viralDocumentCompletions,viralDocumentFirstQuartileCompletions,viralDocumentMidpointCompletions,approximateUniqueImpressions,viralDownloadClicks,impressions", + ] + + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "analytics", + **super().get_url_params(context, next_page_token), + } + + def get_unescaped_params(self, context: Context | None) -> dict: + start_date = pendulum.parse(self.config["start_date"]) + end_date = pendulum.parse(self.config["end_date"]) + return { + "pivot": "(value:CAMPAIGN)", + "timeGranularity": "(value:DAILY)", + "campaigns": f"List(urn%3Ali%3AsponsoredCampaign%3A{context['campaign_id']})", + "dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))", + "fields": self.adanalyticscolumns[0], + } + + def post_process(self, row: dict, context: dict | None = None) -> dict | None: + # This function extracts day, month, and year from date range column + # These values are parsed with datetime function and the date is added to the day column + date_range = row.get("dateRange", {}) + start_date = date_range.get("start", {}) + + if start_date: + row["day"] = datetime.strptime( + f'{start_date.get("year")}-{start_date.get("month")}-{start_date.get("day")}', + "%Y-%m-%d", + ).astimezone(UTC) + + return super().post_process(row, context) + + +class AdAnalyticsByCampaignSecond(AdAnalyticsByCampaignInit): + name = "adanalyticsbycampaign_second" + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + **super().get_unescaped_params(context), + # Overwrite fields with this column subset + "fields": self.adanalyticscolumns[0], + } + + +class AdAnalyticsByCampaignThird(AdAnalyticsByCampaignInit): + name = "adanalyticsbycampaign_third" + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + **super().get_unescaped_params(context), + # Overwrite fields with this column subset + "fields": self.adanalyticscolumns[3], + } + + +class AdAnalyticsByCampaignStream(AdAnalyticsByCampaignInit): + name = "ad_analytics_by_campaign" + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + **super().get_unescaped_params(context), + # Overwrite fields with this column subset + "fields": self.adanalyticscolumns[1], + } + + def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: + """Return a dictionary of records from adAnalytics classes. + + Combines request columns from multiple calls to the api, which are limited to 20 columns + each. + + Uses `merge_dicts` to combine responses from each class + super().get_records calls only the records from the adAnalyticsByCampaign class + zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts() + function list() converts each stream context into lists + + Args: + context: The stream context. + + Returns: + A dictionary of records given from adAnalytics streams + """ + adanalyticsinit_stream = AdAnalyticsByCampaignInit( + self._tap, + schema={"properties": {}}, + ) + adanalyticsecond_stream = AdAnalyticsByCampaignSecond( + self._tap, + schema={"properties": {}}, + ) + adanalyticsthird_stream = AdAnalyticsByCampaignThird( + self._tap, + schema={"properties": {}}, + ) + return [ + self.merge_dicts(x, y, z, p) + for x, y, z, p in zip( + list(adanalyticsinit_stream.get_records(context)), + list(super().get_records(context)), + list(adanalyticsecond_stream.get_records(context)), + list(adanalyticsthird_stream.get_records(context)), + ) + ] + + def merge_dicts(self, *dict_args: dict) -> dict: + """Return a merged dictionary of adAnalytics responses. + + Args: + *dict_args: dictionaries with adAnalytics response data. + + Returns: + A merged dictionary of adAnalytics responses + """ + result = {} + for dictionary in dict_args: + result.update(dictionary) + return result diff --git a/tap_linkedin_ads/tap.py b/tap_linkedin_ads/tap.py index 3d8cf78..716cb50 100644 --- a/tap_linkedin_ads/tap.py +++ b/tap_linkedin_ads/tap.py @@ -82,6 +82,7 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]: streams.CampaignGroupsStream(self), streams.CreativesStream(self), streams.VideoAdsStream(self), + streams.AdAnalyticsByCampaignStream(self), ]