Skip to content

Commit

Permalink
[source-tiktok-marketing] - Removes stream_state interpolation, cus…
Browse files Browse the repository at this point in the history
…tom cursor (#53645)
  • Loading branch information
pnilan authored Feb 26, 2025
1 parent 575eaf1 commit ea3110d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 4bfac00d-ce15-44ff-95b9-9e3c3e8fbd35
dockerImageTag: 4.4.0-rc1
dockerImageTag: 4.4.0-rc2
dockerRepository: airbyte/source-tiktok-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/tiktok-marketing
githubIssueLabel: source-tiktok-marketing
Expand All @@ -22,12 +22,11 @@ data:
registryOverrides:
cloud:
enabled: true
dockerImageTag: 4.3.7
oss:
enabled: true
releases:
rolloutConfiguration:
enableProgressiveRollout: false
enableProgressiveRollout: true
breakingChanges:
4.0.0:
message:
Expand Down
22 changes: 10 additions & 12 deletions airbyte-integrations/connectors/source-tiktok-marketing/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.4.0-rc1"
version = "4.4.0-rc2"
name = "source-tiktok-marketing"
description = "Source implementation for Tiktok Marketing."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -24,7 +24,7 @@ freezegun = "^1.1.0"
source-tiktok-marketing = "source_tiktok_marketing.run:run"

[tool.poetry.group.dev.dependencies]
requests-mock = "==1.9.3"
requests-mock = "==1.12.1"
timeout-decorator = "==0.5.0"
pytest-mock = "^3.6.1"
pytest = "^6.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

import json
from typing import Any, Iterable, Mapping

import dpath.util
import dpath

from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.types import StreamSlice
Expand Down Expand Up @@ -39,7 +39,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def get_partition_value_from_config(self) -> str:
for path in self._path_to_partition_in_config:
config_value = dpath.util.get(self.config, path, default=None)
config_value = dpath.get(self.config, path, default=None)
if config_value:
return config_value

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ definitions:
response_filters:
- predicate: "{{ response.get('code') == 40100 }}"
action: RATE_LIMITED
error_message: "TikTok Marketing API rate limit exceeded. Please verify that only one Airbyte connection with the same credentials is running at a time. If the problem persists, even after ensuring that only one connection is running, please contact Airbyte support. {{ response.get('message', '') }}"
- predicate: "{{ response.get('code') != 0 }}"
action: FAIL
error_message: "{{ response['message'] }}"
Expand All @@ -38,15 +39,6 @@ definitions:
type: DpathExtractor
field_path: ["data", "list"]

record_selector_with_filter_by_modify_time:
$ref: "#/definitions/record_selector"
record_filter:
type: CustomRecordFilter
class_name: "source_tiktok_marketing.components.semi_incremental_record_filter.PerPartitionRecordFilter"
condition: "{{ record['modify_time'] >= stream_state.get('modify_time', config.get('start_date', '')) }}"
$parameters:
partition_field: advertiser_id

retriever:
type: SimpleRetriever
requester:
Expand Down Expand Up @@ -370,23 +362,11 @@ definitions:
schema:
$ref: "#/definitions/schemas/creative_assets_videos"

record_selector_for_daily_reports_streams:
$ref: "#/definitions/record_selector"
record_filter:
type: CustomRecordFilter
class_name: "source_tiktok_marketing.components.semi_incremental_record_filter.PerPartitionRecordFilter"
condition: "{{ record['dimensions']['stat_time_day'] >= stream_state.get('stat_time_day', config.get('start_date', '')) }}"
$parameters:
partition_field: advertiser_id

record_selector_for_hourly_reports_streams:
$ref: "#/definitions/record_selector"
record_filter:
type: CustomRecordFilter
class_name: "source_tiktok_marketing.components.semi_incremental_record_filter.PerPartitionRecordFilter"
condition: "{{ record['dimensions']['stat_time_hour'] >= stream_state.get('stat_time_hour', config.get('start_date', '')) }}"
$parameters:
partition_field: advertiser_id
type: RecordFilter
condition: "{{ record['dimensions']['stat_time_hour'] >= stream_interval['start_time'] }}"

base_report_retriever:
type: SimpleRetriever
Expand All @@ -405,7 +385,7 @@ definitions:
$ref: "#/definitions/authenticator"
request_body_json: {}
record_selector:
$ref: "#/definitions/record_selector_for_daily_reports_streams"
$ref: "#/definitions/record_selector"
paginator:
$ref: "#/definitions/paginator_page_increment"
pagination_strategy:
Expand All @@ -431,28 +411,27 @@ definitions:
datetime_format: "%Y-%m-%d"
end_datetime:
type: MinMaxDatetime
datetime: "{{ config.get('end_date', today_utc()) }}"
datetime: "{{ config.get('end_date') if config.get('end_date') else format_datetime(now_utc(), '%Y-%m-%d') }}"
datetime_format: "%Y-%m-%d"

report_hourly_incremental_sync:
type: CustomIncrementalSync
class_name: source_tiktok_marketing.components.hourly_datetime_based_cursor.HourlyDatetimeBasedCursor
type: DatetimeBasedCursor
cursor_field: "stat_time_hour"
lookback_window: "P{{ config.get('attribution_window', 0) }}D"
cursor_granularity: "PT1H"
step: P1D
cursor_datetime_formats:
- "%Y-%m-%d %H:%M:%S"
- "%Y-%m-%dT%H:%M:%SZ"
datetime_format: "%Y-%m-%d"
datetime_format: "%Y-%m-%d %H:%M:%S"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config.get('start_date', '2016-09-01') }}"
datetime_format: "%Y-%m-%d"
datetime: "{{ format_datetime(config.get('start_date', '2016-09-01'), '%Y-%m-%d %H:%M:%S', '%Y-%m-%d') }}"
datetime_format: "%Y-%m-%d %H:%M:%S"
end_datetime:
type: MinMaxDatetime
datetime: "{{ config.get('end_date', today_utc()) }}"
datetime_format: "%Y-%m-%d"
datetime: "{{ format_datetime((str_to_datetime(config.get('end_date')) if config.get('end_date') else now_utc()) + duration('P1D'), '%Y-%m-%d %H:%M:%S') }}"
datetime_format: "%Y-%m-%d %H:%M:%S"

base_report_daily:
schema_loader:
Expand All @@ -462,17 +441,44 @@ definitions:
retriever:
$ref: "#/definitions/base_report_retriever"
record_selector:
$ref: "#/definitions/record_selector_for_daily_reports_streams"
$ref: "#/definitions/record_selector"
incremental_sync:
$ref: "#/definitions/report_daily_incremental_sync"

base_report_hourly_retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/requester"
request_parameters:
service_type: "AUCTION"
report_type: "BASIC"
data_level: '{{ parameters["data_level"] }}'
dimensions: '{{ parameters["dimensions"] | string }}'
metrics: '{{ (parameters.get("report_metrics", []) + ["spend", "cpc", "cpm", "impressions", "clicks", "ctr", "reach", "cost_per_1000_reached", "frequency", "video_play_actions", "video_watched_2s", "video_watched_6s", "average_video_play", "average_video_play_per_user", "video_views_p25", "video_views_p50", "video_views_p75", "video_views_p100", "profile_visits", "likes", "comments", "shares", "follows", "clicks_on_music_disc", "real_time_app_install", "real_time_app_install_cost", "app_install"]) | string }}'
start_date: "{{ format_datetime(stream_interval['start_time'], '%Y-%m-%d', '%Y-%m-%d %H:%M:%S') }}"
end_date: "{{ format_datetime(stream_interval['start_time'], '%Y-%m-%d', '%Y-%m-%d %H:%M:%S') }}"
filtering: '{{ parameters["filtering"] | string if parameters.get("filtering") and config.get("include_deleted", False)}}'
authenticator:
$ref: "#/definitions/authenticator"
request_body_json: {}
record_selector:
$ref: "#/definitions/record_selector_for_hourly_reports_streams"
paginator:
$ref: "#/definitions/paginator_page_increment"
pagination_strategy:
type: "PageIncrement"
page_size: 1000
start_from_page: 1
partition_router:
$ref: "#/definitions/single_id_partition_router"

base_report_hourly:
schema_loader:
type: InlineSchemaLoader
schema:
$ref: "#/definitions/schemas/base_report"
retriever:
$ref: "#/definitions/base_report_retriever"
$ref: "#/definitions/base_report_hourly_retriever"
record_selector:
$ref: "#/definitions/record_selector_for_hourly_reports_streams"
incremental_sync:
Expand Down Expand Up @@ -709,7 +715,7 @@ definitions:
$ref: "#/definitions/authenticator"
request_body_json: {}
record_selector:
$ref: "#/definitions/record_selector_for_daily_reports_streams"
$ref: "#/definitions/record_selector"
paginator:
$ref: "#/definitions/paginator_page_increment"
pagination_strategy:
Expand Down Expand Up @@ -4314,7 +4320,7 @@ spec:
# If time allows it, we can definitely try to scope this number a bit better empirically.
concurrency_level:
type: ConcurrencyLevel
default_concurrency: 3
default_concurrency: "{{ config.get('concurrency_level', 3) }}"
max_concurrency: 20

metadata:
Expand Down
Loading

0 comments on commit ea3110d

Please sign in to comment.