Skip to content

Commit

Permalink
analytics creative stream
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Oct 23, 2024
1 parent aa28cbd commit 1a22117
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 0 deletions.
271 changes: 271 additions & 0 deletions tap_linkedin_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
"""Return a context dictionary for a child stream."""
return {
"creative_urn": record["id"],
}


class VideoAdsStream(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders."""
Expand Down Expand Up @@ -977,3 +983,268 @@ def merge_dicts(self, *dict_args: dict) -> dict:
for dictionary in dict_args:
result.update(dictionary)
return result


class AdAnalyticsByCreativeInit(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting#analytics-finder."""

name = "AdAnalyticsByCreativeInit"
path = "/adAnalytics"
parent_stream_type = CreativesStream

schema = PropertiesList(
Property("landingPageClicks", IntegerType),
Property("reactions", IntegerType),
Property("adUnitClicks", IntegerType),
Property("creative_id", StringType),
Property("documentCompletions", IntegerType),
Property("documentFirstQuartileCompletions", IntegerType),
Property("clicks", IntegerType),
Property("documentMidpointCompletions", IntegerType),
Property("documentThirdQuartileCompletions", IntegerType),
Property("downloadClicks", IntegerType),
Property("jobApplications", StringType),
Property("jobApplyClicks", StringType),
Property("postViewJobApplications", StringType),
Property("costInUsd", StringType),
Property("postViewRegistrations", StringType),
Property("registrations", StringType),
Property("talentLeads", IntegerType),
Property("viralDocumentCompletions", IntegerType),
Property("viralDocumentFirstQuartileCompletions", IntegerType),
Property("viralDocumentMidpointCompletions", IntegerType),
Property("viralDocumentThirdQuartileCompletions", IntegerType),
Property("viralDownloadClicks", IntegerType),
Property("viralJobApplications", StringType),
Property("viralJobApplyClicks", StringType),
Property("costInLocalCurrency", StringType),
Property("viralRegistrations", IntegerType),
Property("approximateUniqueImpressions", IntegerType),
Property("cardClicks", IntegerType),
Property("cardImpressions", IntegerType),
Property("commentLikes", IntegerType),
Property("viralCardClicks", IntegerType),
Property("viralCardImpressions", IntegerType),
Property("viralCommentLikes", IntegerType),
Property("actionClicks", IntegerType),
Property("comments", IntegerType),
Property("companyPageClicks", IntegerType),
Property("conversionValueInLocalCurrency", StringType),
Property(
"dateRange",
ObjectType(
Property(
"end",
ObjectType(
Property("day", IntegerType),
Property("month", IntegerType),
Property("year", IntegerType),
additional_properties=False,
),
),
Property(
"start",
ObjectType(
Property("day", IntegerType),
Property("month", IntegerType),
Property("year", IntegerType),
additional_properties=False,
),
),
),
),
Property("day", StringType),
Property("externalWebsiteConversions", IntegerType),
Property("externalWebsitePostClickConversions", IntegerType),
Property("externalWebsitePostViewConversions", IntegerType),
Property("follows", IntegerType),
Property("fullScreenPlays", IntegerType),
Property("impressions", IntegerType),
Property("landingPageClicks", IntegerType),
Property("leadGenerationMailContactInfoShares", IntegerType),
Property("leadGenerationMailInterestedClicks", IntegerType),
Property("likes", IntegerType),
Property("oneClickLeadFormOpens", IntegerType),
Property("oneClickLeads", IntegerType),
Property("opens", IntegerType),
Property("otherEngagements", IntegerType),
Property("sends", IntegerType),
Property("shares", IntegerType),
Property("textUrlClicks", IntegerType),
Property("totalEngagements", IntegerType),
Property("videoCompletions", IntegerType),
Property("videoFirstQuartileCompletions", IntegerType),
Property("videoMidpointCompletions", IntegerType),
Property("videoStarts", IntegerType),
Property("videoThirdQuartileCompletions", IntegerType),
Property("videoViews", IntegerType),
Property("viralClicks", IntegerType),
Property("viralComments", IntegerType),
Property("viralCompanyPageClicks", IntegerType),
Property("viralExternalWebsiteConversions", IntegerType),
Property("viralExternalWebsitePostClickConversions", IntegerType),
Property("viralExternalWebsitePostViewConversions", IntegerType),
Property("viralFollows", IntegerType),
Property("viralFullScreenPlays", IntegerType),
Property("viralImpressions", IntegerType),
Property("viralLandingPageClicks", IntegerType),
Property("viralLikes", IntegerType),
Property("viralOneClickLeadFormOpens", IntegerType),
Property("viralOneclickLeads", IntegerType),
Property("viralOtherEngagements", IntegerType),
Property("viralReactions", IntegerType),
Property("viralShares", IntegerType),
Property("viralTotalEngagements", IntegerType),
Property("viralVideoCompletions", IntegerType),
Property("viralVideoFirstQuartileCompletions", IntegerType),
Property("viralVideoMidpointCompletions", IntegerType),
Property("viralVideoStarts", IntegerType),
Property("viralVideoThirdQuartileCompletions", IntegerType),
Property("viralVideoViews", IntegerType),
).to_dict()

@property
def adanalyticscolumns(self) -> list[str]:
"""List of columns for adanalytics endpoint."""
return [
"viralLandingPageClicks,viralExternalWebsitePostClickConversions,externalWebsiteConversions,viralVideoFirstQuartileCompletions,leadGenerationMailContactInfoShares,clicks,viralClicks,shares,viralFullScreenPlays,videoMidpointCompletions,viralCardClicks,viralExternalWebsitePostViewConversions,viralTotalEngagements,viralCompanyPageClicks,actionClicks,viralShares,videoCompletions,comments,externalWebsitePostViewConversions,dateRange",
"costInUsd,landingPageClicks,oneClickLeadFormOpens,talentLeads,sends,viralOneClickLeadFormOpens,conversionValueInLocalCurrency,viralFollows,otherEngagements,viralVideoCompletions,cardImpressions,leadGenerationMailInterestedClicks,opens,totalEngagements,videoViews,viralImpressions,viralVideoViews,commentLikes,viralDocumentThirdQuartileCompletions,viralLikes",
"adUnitClicks,videoThirdQuartileCompletions,cardClicks,likes,viralComments,viralVideoMidpointCompletions,viralVideoThirdQuartileCompletions,oneClickLeads,fullScreenPlays,viralCardImpressions,follows,videoStarts,videoFirstQuartileCompletions,textUrlClicks,reactions,viralReactions,externalWebsitePostClickConversions,viralOtherEngagements,costInLocalCurrency",
"viralVideoStarts,viralRegistrations,viralJobApplyClicks,viralJobApplications,jobApplications,jobApplyClicks,viralExternalWebsiteConversions,postViewRegistrations,companyPageClicks,documentCompletions,documentFirstQuartileCompletions,documentMidpointCompletions,documentThirdQuartileCompletions,downloadClicks,viralDocumentCompletions,viralDocumentFirstQuartileCompletions,viralDocumentMidpointCompletions,approximateUniqueImpressions,viralDownloadClicks,impressions",
]

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: t.Any | None, # noqa: ANN401
) -> dict[str, t.Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
return {
"q": "analytics",
**super().get_url_params(context, next_page_token),
}

def get_unescaped_params(self, context: Context | None) -> dict:
start_date = pendulum.parse(self.config["start_date"])
end_date = pendulum.parse(self.config["end_date"])
creative_urn = context["creative_urn"]
creative_id = creative_urn.split(":")[-1]
return {
"pivot": "(value:CREATIVE)",
"timeGranularity": "(value:DAILY)",
"creatives": f"List(urn%3Ali%3AsponsoredCreative%3A{creative_id})",
"dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))",
"fields": self.adanalyticscolumns[0],
}

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
# This function extracts day, month, and year from date range column
# These values are parsed with datetime function and the date is added to the day column
date_range = row.get("dateRange", {})
start_date = date_range.get("start", {})

if start_date:
row["day"] = datetime.strptime(
f'{start_date.get("year")}-{start_date.get("month")}-{start_date.get("day")}',
"%Y-%m-%d",
).astimezone(UTC)

viral_registrations = row.pop("viralRegistrations", None)
if viral_registrations:
row["viralRegistrations"] = int(viral_registrations)

return super().post_process(row, context)


class AdAnalyticsByCreativeStream(AdAnalyticsByCreativeInit):
name = "ad_analytics_by_creative"

def get_unescaped_params(self, context: Context | None) -> dict:
return {
**super().get_unescaped_params(context),
# Overwrite fields with this column subset
"fields": self.adanalyticscolumns[1],
}

def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a dictionary of records from adAnalytics classes.
Combines request columns from multiple calls to the api, which are limited to 20 columns
each.
Uses `merge_dicts` to combine responses from each class
super().get_records calls only the records from adAnalyticsByCreative class
zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts()
function list() converts each stream context into lists
Args:
context: The stream context.
Returns:
A dictionary of records given from adAnalytics streams
"""
adanalyticsinit_stream = AdAnalyticsByCreativeInit(
self._tap,
schema={"properties": {}},
)
adanalyticsecond_stream = AdAnalyticsByCreativeSecond(
self._tap,
schema={"properties": {}},
)
adanalyticsthird_stream = AdAnalyticsByCreativeThird(
self._tap,
schema={"properties": {}},
)
return [
self.merge_dicts(x, y, z, p)
for x, y, z, p in zip(
list(adanalyticsinit_stream.get_records(context)),
list(super().get_records(context)),
list(adanalyticsecond_stream.get_records(context)),
list(adanalyticsthird_stream.get_records(context)),
)
]

def merge_dicts(self, *dict_args: dict) -> dict:
"""Return a merged dictionary of adAnalytics responses.
Args:
*dict_args: dictionaries with adAnalytics response data.
Returns:
A merged dictionary of adAnalytics responses
"""
result = {}
for dictionary in dict_args:
result.update(dictionary)
return result


class AdAnalyticsByCreativeSecond(AdAnalyticsByCreativeInit):
name = "adanalyticsbycreative_second"

def get_unescaped_params(self, context: Context | None) -> dict:
return {
**super().get_unescaped_params(context),
# Overwrite fields with this column subset
"fields": self.adanalyticscolumns[2],
}


class AdAnalyticsByCreativeThird(AdAnalyticsByCreativeInit):
name = "adanalyticsbycreative_third"

def get_unescaped_params(self, context: Context | None) -> dict:
return {
**super().get_unescaped_params(context),
# Overwrite fields with this column subset
"fields": self.adanalyticscolumns[3],
}
1 change: 1 addition & 0 deletions tap_linkedin_ads/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]:
streams.CreativesStream(self),
streams.VideoAdsStream(self),
streams.AdAnalyticsByCampaignStream(self),
streams.AdAnalyticsByCreativeStream(self),
]


Expand Down

0 comments on commit 1a22117

Please sign in to comment.