Skip to content

Commit

Permalink
video ads and fix pks
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Oct 23, 2024
1 parent 8ebbb0e commit 62d4e5a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 6 deletions.
99 changes: 93 additions & 6 deletions tap_linkedin_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class AccountsStream(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-accounts#search-for-accounts."""

name = "accounts"
primary_keys = ["last_modified_time", "id", "status"]
primary_keys: t.ClassVar[list[str]] = ["id"]

schema = PropertiesList(
Property(
Expand Down Expand Up @@ -82,7 +82,10 @@ class AccountsStream(LinkedInAdsStream):

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
"""Return a context dictionary for a child stream."""
return {"account_id": record["id"]}
return {
"account_id": record["id"],
"owner_urn": record["reference"],
}

def get_url_params(
self,
Expand Down Expand Up @@ -110,7 +113,7 @@ class AccountUsersStream(LinkedInAdsStream):

name = "account_users"
parent_stream_type = AccountsStream
primary_keys = ["user_person_id", "last_modified_time"]
primary_keys: t.ClassVar[list[str]] = ["account"]
path = "/adAccountUsers"

schema = PropertiesList(
Expand Down Expand Up @@ -168,7 +171,7 @@ class CampaignsStream(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-campaigns#search-for-campaigns."""

name = "campaigns"
primary_keys = ["last_modified_time", "id", "status"]
primary_keys: t.ClassVar[list[str]] = ["id"]
parent_stream_type = AccountsStream
next_page_token_jsonpath = (
"$.metadata.nextPageToken" # Or override `get_next_page_token`. # noqa: S105
Expand Down Expand Up @@ -463,7 +466,7 @@ class CampaignGroupsStream(LinkedInAdsStream):

name = "campaign_groups"
parent_stream_type = AccountsStream
primary_keys = ["last_modified_time", "id", "status"]
primary_keys: t.ClassVar[list[str]] = ["id"]

schema = PropertiesList(
Property(
Expand Down Expand Up @@ -555,7 +558,7 @@ class CreativesStream(LinkedInAdsStream):

name = "creatives"
parent_stream_type = AccountsStream
primary_keys = ["lastModifiedAt", "id"]
primary_keys: t.ClassVar[list[str]] = ["id"]

schema = PropertiesList(
Property("account", StringType),
Expand Down Expand Up @@ -623,3 +626,87 @@ def get_url_params(
"q": "criteria",
**super().get_url_params(context, next_page_token),
}


class VideoAdsStream(LinkedInAdsStream):
"""https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders."""

name = "video_ads"
path = "/adDirectSponsoredContents"
parent_stream_type = AccountsStream

schema = PropertiesList(
Property("account", StringType),
Property("account_id", IntegerType),
Property(
"changeAuditStamps",
ObjectType(
Property(
"created",
ObjectType(
Property("time", IntegerType),
additional_properties=False,
),
),
Property(
"lastModified",
ObjectType(
Property("time", IntegerType),
additional_properties=False,
),
),
),
),
Property("created_time", StringType),
Property("last_modified_time", StringType),
Property("content_reference", StringType),
Property("content_reference_ucg_post_id", IntegerType),
Property("content_reference_share_id", IntegerType),
Property("name", StringType),
Property("type", StringType),
).to_dict()

@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
return "https://api.linkedin.com/v2"

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: t.Any | None, # noqa: ANN401
) -> dict[str, t.Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
return {
"q": "account",
"account": f"urn:li:sponsoredAccount:{context['account_id']}",
"owner": context["owner_urn"],
**super().get_url_params(context, next_page_token),
}

# TODO: handle timestamp parsing more generically
def post_process(self, row: dict, context: dict | None = None) -> dict | None:
# This function extracts day, month, and year from date range column
# These values are parse with datetime function and the date is added to the day column
# with contextlib.suppress(Exception):
# created_time = row.get("changeAuditStamps", {}).get("created", {}).get("time")
# last_modified_time = (
# row.get("changeAuditStamps", {}).get("lastModified", {}).get("time")
# )
# row["created_time"] = datetime.fromtimestamp(
# int(created_time) / 1000,
# tz=UTC,
# ).isoformat()
# row["last_modified_time"] = datetime.fromtimestamp(
# int(last_modified_time) / 1000,
# tz=UTC,
# ).isoformat()
return super().post_process(row, context)
1 change: 1 addition & 0 deletions tap_linkedin_ads/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]:
streams.CampaignsStream(self),
streams.CampaignGroupsStream(self),
streams.CreativesStream(self),
streams.VideoAdsStream(self),
]


Expand Down

0 comments on commit 62d4e5a

Please sign in to comment.