Skip to content

Commit

Permalink
refactor unescaped params and add first analytics stream
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Oct 23, 2024
1 parent 62d4e5a commit aa28cbd
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 4 deletions.
7 changes: 3 additions & 4 deletions tap_linkedin_ads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,19 @@ def request_records(self, context: Context | None) -> t.Iterable[dict]:
context,
next_page_token=paginator.current_value,
)
resp = decorated_request(prepared_request, context)
# Patch to add unescaped params to the path and url
if self.get_unescaped_params(context):
prepared_request.url = (
prepared_request.url
+ "?"
+ "&"
+ "&".join(
[
f"{k}={v}"
for k, v in self.get_unescaped_params().items()
for k, v in self.get_unescaped_params(context).items()
],
)
)

resp = decorated_request(prepared_request, context)
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
records = iter(self.parse_response(resp))
Expand Down
267 changes: 267 additions & 0 deletions tap_linkedin_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

from __future__ import annotations

import contextlib
import typing as t
from datetime import datetime, timezone
from importlib import resources

import pendulum
from singer_sdk.helpers.types import Context
from singer_sdk.typing import (
ArrayType,
Expand All @@ -20,6 +23,7 @@
from tap_linkedin_ads.client import LinkedInAdsStream

SCHEMAS_DIR = resources.files(__package__) / "schemas"
UTC = timezone.utc


class AccountsStream(LinkedInAdsStream):
Expand Down Expand Up @@ -460,6 +464,12 @@ def get_unescaped_params(self, context: Context | None) -> dict:
"search": "(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED,CANCELED,DRAFT,PENDING_DELETION,REMOVED)))"
}

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
"""Return a context dictionary for a child stream."""
return {
"campaign_id": record["id"],
}


class CampaignGroupsStream(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-campaign-groups#search-for-campaign-groups."""
Expand Down Expand Up @@ -710,3 +720,260 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None:
# tz=UTC,
# ).isoformat()
return super().post_process(row, context)


class AdAnalyticsByCampaignInit(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting#analytics-finder."""

name = "AdAnalyticsByCampaignInit"
path = "/adAnalytics"
parent_stream_type = CampaignsStream

schema = PropertiesList(
Property("campaign_id", StringType),
Property("documentCompletions", IntegerType),
Property("documentFirstQuartileCompletions", IntegerType),
Property("clicks", IntegerType),
Property("documentMidpointCompletions", IntegerType),
Property("documentThirdQuartileCompletions", IntegerType),
Property("downloadClicks", IntegerType),
Property("jobApplications", StringType),
Property("jobApplyClicks", StringType),
Property("postViewJobApplications", StringType),
Property("costInUsd", StringType),
Property("postViewRegistrations", StringType),
Property("registrations", StringType),
Property("talentLeads", IntegerType),
Property("viralDocumentCompletions", IntegerType),
Property("viralDocumentFirstQuartileCompletions", IntegerType),
Property("viralDocumentMidpointCompletions", IntegerType),
Property("viralDocumentThirdQuartileCompletions", IntegerType),
Property("viralDownloadClicks", IntegerType),
Property("viralJobApplications", StringType),
Property("viralJobApplyClicks", StringType),
Property("costInLocalCurrency", StringType),
Property("viralRegistrations", StringType),
Property("approximateUniqueImpressions", IntegerType),
Property("cardClicks", IntegerType),
Property("cardImpressions", IntegerType),
Property("commentLikes", IntegerType),
Property("viralCardClicks", IntegerType),
Property("viralCardImpressions", IntegerType),
Property("viralCommentLikes", IntegerType),
Property("actionClicks", IntegerType),
Property("adUnitClicks", IntegerType),
Property("comments", IntegerType),
Property("companyPageClicks", IntegerType),
Property("conversionValueInLocalCurrency", StringType),
Property(
"dateRange",
ObjectType(
Property(
"end",
ObjectType(
Property("day", IntegerType),
Property("month", IntegerType),
Property("year", IntegerType),
additional_properties=False,
),
),
Property(
"start",
ObjectType(
Property("day", IntegerType),
Property("month", IntegerType),
Property("year", IntegerType),
additional_properties=False,
),
),
),
),
Property("day", StringType),
Property("externalWebsiteConversions", IntegerType),
Property("externalWebsitePostClickConversions", IntegerType),
Property("externalWebsitePostViewConversions", IntegerType),
Property("follows", IntegerType),
Property("fullScreenPlays", IntegerType),
Property("impressions", IntegerType),
Property("landingPageClicks", IntegerType),
Property("leadGenerationMailContactInfoShares", IntegerType),
Property("leadGenerationMailInterestedClicks", IntegerType),
Property("likes", IntegerType),
Property("oneClickLeadFormOpens", IntegerType),
Property("oneClickLeads", IntegerType),
Property("opens", IntegerType),
Property("otherEngagements", IntegerType),
Property("sends", IntegerType),
Property("shares", IntegerType),
Property("textUrlClicks", IntegerType),
Property("totalEngagements", IntegerType),
Property("videoCompletions", IntegerType),
Property("videoFirstQuartileCompletions", IntegerType),
Property("videoMidpointCompletions", IntegerType),
Property("videoStarts", IntegerType),
Property("videoThirdQuartileCompletions", IntegerType),
Property("videoViews", IntegerType),
Property("viralClicks", IntegerType),
Property("viralComments", IntegerType),
Property("viralCompanyPageClicks", IntegerType),
Property("viralExternalWebsiteConversions", IntegerType),
Property("viralExternalWebsitePostClickConversions", IntegerType),
Property("viralExternalWebsitePostViewConversions", IntegerType),
Property("viralFollows", IntegerType),
Property("viralFullScreenPlays", IntegerType),
Property("viralImpressions", IntegerType),
Property("viralLandingPageClicks", IntegerType),
Property("viralLikes", IntegerType),
Property("viralOneClickLeadFormOpens", IntegerType),
Property("viralOneclickLeads", IntegerType),
Property("viralOtherEngagements", IntegerType),
Property("viralReactions", IntegerType),
Property("reactions", IntegerType),
Property("viralShares", IntegerType),
Property("viralTotalEngagements", IntegerType),
Property("viralVideoCompletions", IntegerType),
Property("viralVideoFirstQuartileCompletions", IntegerType),
Property("viralVideoMidpointCompletions", IntegerType),
Property("viralVideoStarts", IntegerType),
Property("viralVideoThirdQuartileCompletions", IntegerType),
Property("viralVideoViews", IntegerType),
).to_dict()

@property
def adanalyticscolumns(self) -> list[str]:
return [
"viralLandingPageClicks,viralExternalWebsitePostClickConversions,externalWebsiteConversions,viralVideoFirstQuartileCompletions,leadGenerationMailContactInfoShares,clicks,viralClicks,shares,viralFullScreenPlays,videoMidpointCompletions,viralCardClicks,viralExternalWebsitePostViewConversions,viralTotalEngagements,viralCompanyPageClicks,actionClicks,viralShares,videoCompletions,comments,externalWebsitePostViewConversions,dateRange",
"costInUsd,landingPageClicks,oneClickLeadFormOpens,talentLeads,sends,viralOneClickLeadFormOpens,conversionValueInLocalCurrency,viralFollows,otherEngagements,viralVideoCompletions,cardImpressions,leadGenerationMailInterestedClicks,opens,totalEngagements,videoViews,viralImpressions,viralVideoViews,commentLikes,viralDocumentThirdQuartileCompletions,viralLikes",
"adUnitClicks,videoThirdQuartileCompletions,cardClicks,likes,viralComments,viralVideoMidpointCompletions,viralVideoThirdQuartileCompletions,oneClickLeads,fullScreenPlays,viralCardImpressions,follows,videoStarts,videoFirstQuartileCompletions,textUrlClicks,reactions,viralReactions,externalWebsitePostClickConversions,viralOtherEngagements,costInLocalCurrency",
"viralVideoStarts,viralRegistrations,viralJobApplyClicks,viralJobApplications,jobApplications,jobApplyClicks,viralExternalWebsiteConversions,postViewRegistrations,companyPageClicks,documentCompletions,documentFirstQuartileCompletions,documentMidpointCompletions,documentThirdQuartileCompletions,downloadClicks,viralDocumentCompletions,viralDocumentFirstQuartileCompletions,viralDocumentMidpointCompletions,approximateUniqueImpressions,viralDownloadClicks,impressions",
]

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: t.Any | None, # noqa: ANN401
) -> dict[str, t.Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
return {
"q": "analytics",
**super().get_url_params(context, next_page_token),
}

def get_unescaped_params(self, context: Context | None) -> dict:
start_date = pendulum.parse(self.config["start_date"])
end_date = pendulum.parse(self.config["end_date"])
return {
"pivot": "(value:CAMPAIGN)",
"timeGranularity": "(value:DAILY)",
"campaigns": f"List(urn%3Ali%3AsponsoredCampaign%3A{context['campaign_id']})",
"dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))",
"fields": self.adanalyticscolumns[0],
}

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
# This function extracts day, month, and year from date range column
# These values are parsed with datetime function and the date is added to the day column
date_range = row.get("dateRange", {})
start_date = date_range.get("start", {})

if start_date:
row["day"] = datetime.strptime(
f'{start_date.get("year")}-{start_date.get("month")}-{start_date.get("day")}',
"%Y-%m-%d",
).astimezone(UTC)

return super().post_process(row, context)


class AdAnalyticsByCampaignSecond(AdAnalyticsByCampaignInit):
name = "adanalyticsbycampaign_second"

def get_unescaped_params(self, context: Context | None) -> dict:
return {
**super().get_unescaped_params(context),
# Overwrite fields with this column subset
"fields": self.adanalyticscolumns[0],
}


class AdAnalyticsByCampaignThird(AdAnalyticsByCampaignInit):
name = "adanalyticsbycampaign_third"

def get_unescaped_params(self, context: Context | None) -> dict:
return {
**super().get_unescaped_params(context),
# Overwrite fields with this column subset
"fields": self.adanalyticscolumns[3],
}


class AdAnalyticsByCampaignStream(AdAnalyticsByCampaignInit):
name = "ad_analytics_by_campaign"

def get_unescaped_params(self, context: Context | None) -> dict:
return {
**super().get_unescaped_params(context),
# Overwrite fields with this column subset
"fields": self.adanalyticscolumns[1],
}

def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a dictionary of records from adAnalytics classes.
Combines request columns from multiple calls to the api, which are limited to 20 columns
each.
Uses `merge_dicts` to combine responses from each class
super().get_records calls only the records from the adAnalyticsByCampaign class
zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts()
function list() converts each stream context into lists
Args:
context: The stream context.
Returns:
A dictionary of records given from adAnalytics streams
"""
adanalyticsinit_stream = AdAnalyticsByCampaignInit(
self._tap,
schema={"properties": {}},
)
adanalyticsecond_stream = AdAnalyticsByCampaignSecond(
self._tap,
schema={"properties": {}},
)
adanalyticsthird_stream = AdAnalyticsByCampaignThird(
self._tap,
schema={"properties": {}},
)
return [
self.merge_dicts(x, y, z, p)
for x, y, z, p in zip(
list(adanalyticsinit_stream.get_records(context)),
list(super().get_records(context)),
list(adanalyticsecond_stream.get_records(context)),
list(adanalyticsthird_stream.get_records(context)),
)
]

def merge_dicts(self, *dict_args: dict) -> dict:
"""Return a merged dictionary of adAnalytics responses.
Args:
*dict_args: dictionaries with adAnalytics response data.
Returns:
A merged dictionary of adAnalytics responses
"""
result = {}
for dictionary in dict_args:
result.update(dictionary)
return result
1 change: 1 addition & 0 deletions tap_linkedin_ads/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]:
streams.CampaignGroupsStream(self),
streams.CreativesStream(self),
streams.VideoAdsStream(self),
streams.AdAnalyticsByCampaignStream(self),
]


Expand Down

0 comments on commit aa28cbd

Please sign in to comment.