-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-19835: Adding new streams and discovery mode #38
base: master
Are you sure you want to change the base?
Changes from 11 commits
1522ee4
8112ab6
45a53aa
6be4e77
41f35f9
959eb09
bf0b898
251ba65
58cf292
332f3cf
a24dc67
c39f276
3974762
6a294ae
d2e9edf
1bc7d2a
426f555
485b2cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,235 +1,65 @@ | ||||||
#!/usr/bin/env python3 | ||||||
|
||||||
from datetime import datetime, timedelta | ||||||
import os | ||||||
import pytz | ||||||
|
||||||
|
||||||
import sys | ||||||
import json | ||||||
import braintree | ||||||
import singer | ||||||
|
||||||
from singer import utils | ||||||
from .transform import transform_row | ||||||
|
||||||
|
||||||
CONFIG = {} | ||||||
STATE = {} | ||||||
TRAILING_DAYS = timedelta(days=30) | ||||||
DEFAULT_TIMESTAMP = "1970-01-01T00:00:00Z" | ||||||
|
||||||
logger = singer.get_logger() | ||||||
|
||||||
|
||||||
def get_abs_path(path): | ||||||
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) | ||||||
|
||||||
from tap_braintree.discover import discover | ||||||
from tap_braintree.sync import sync as _sync | ||||||
|
||||||
def load_schema(entity): | ||||||
return utils.load_json(get_abs_path("schemas/{}.json".format(entity))) | ||||||
REQUIRED_CONFIG_KEYS = [ | ||||||
"merchant_id", | ||||||
"public_key", | ||||||
"private_key", | ||||||
"start_date" | ||||||
] | ||||||
|
||||||
LOGGER = singer.get_logger() | ||||||
|
||||||
def get_start(entity): | ||||||
if entity not in STATE: | ||||||
STATE[entity] = CONFIG["start_date"] | ||||||
|
||||||
return STATE[entity] | ||||||
|
||||||
|
||||||
def to_utc(dt): | ||||||
return dt.replace(tzinfo=pytz.UTC) | ||||||
|
||||||
|
||||||
def daterange(start_date, end_date): | ||||||
def do_discover(): | ||||||
""" | ||||||
Generator function that produces an iterable list of days between the two | ||||||
dates start_date and end_date as a tuple pair of datetimes. | ||||||
|
||||||
Note: | ||||||
All times are set to 0:00. Designed to be used in date query where query | ||||||
logic would be record_date >= 2019-01-01 0:00 and record_date < 2019-01-02 0:00 | ||||||
|
||||||
Args: | ||||||
start_date (datetime): start of period | ||||||
end_date (datetime): end of period | ||||||
|
||||||
Yields: | ||||||
tuple: daily period | ||||||
* datetime: day within range | ||||||
* datetime: day within range + 1 day | ||||||
|
||||||
Run discovery mode | ||||||
""" | ||||||
|
||||||
# set to start of day | ||||||
start_date = to_utc( | ||||||
datetime.combine( | ||||||
start_date.date(), | ||||||
datetime.min.time() # set to the 0:00 on the day of the start date | ||||||
) | ||||||
) | ||||||
|
||||||
end_date = to_utc(end_date + timedelta(1)) | ||||||
|
||||||
for n in range(int((end_date - start_date).days)): | ||||||
yield start_date + timedelta(n), start_date + timedelta(n + 1) | ||||||
|
||||||
|
||||||
def sync_transactions(): | ||||||
schema = load_schema("transactions") | ||||||
|
||||||
singer.write_schema("transactions", schema, ["id"], | ||||||
bookmark_properties=['created_at']) | ||||||
|
||||||
latest_updated_at = utils.strptime_to_utc(STATE.get('latest_updated_at', DEFAULT_TIMESTAMP)) | ||||||
|
||||||
run_maximum_updated_at = latest_updated_at | ||||||
|
||||||
latest_disbursement_date = utils.strptime_to_utc(STATE.get('latest_disbursment_date', DEFAULT_TIMESTAMP)) | ||||||
|
||||||
run_maximum_disbursement_date = latest_disbursement_date | ||||||
|
||||||
latest_start_date = utils.strptime_to_utc(get_start("transactions")) | ||||||
|
||||||
period_start = latest_start_date - TRAILING_DAYS | ||||||
|
||||||
period_end = utils.now() | ||||||
|
||||||
logger.info("transactions: Syncing from {}".format(period_start)) | ||||||
|
||||||
logger.info("transactions: latest_updated_at from {}, disbursement_date from {}".format( | ||||||
latest_updated_at, latest_disbursement_date | ||||||
)) | ||||||
|
||||||
logger.info("transactions: latest_start_date from {}".format( | ||||||
latest_start_date | ||||||
)) | ||||||
|
||||||
# increment through each day (20k results max from api) | ||||||
for start, end in daterange(period_start, period_end): | ||||||
LOGGER.info("Starting discovery") | ||||||
catalog = discover() | ||||||
json.dump(catalog.to_dict(), sys.stdout, indent=2) | ||||||
LOGGER.info("Finished discover") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated logger messages as per your suggestions |
||||||
|
||||||
end = min(end, period_end) | ||||||
|
||||||
data = braintree.Transaction.search( | ||||||
braintree.TransactionSearch.created_at.between(start, end)) | ||||||
time_extracted = utils.now() | ||||||
|
||||||
logger.info("transactions: Fetched {} records from {} - {}".format( | ||||||
data.maximum_size, start, end | ||||||
)) | ||||||
|
||||||
row_written_count = 0 | ||||||
row_skipped_count = 0 | ||||||
|
||||||
for row in data: | ||||||
# Ensure updated_at consistency | ||||||
if not getattr(row, 'updated_at'): | ||||||
row.updated_at = row.created_at | ||||||
|
||||||
transformed = transform_row(row, schema) | ||||||
updated_at = to_utc(row.updated_at) | ||||||
|
||||||
# if disbursement is successful, get disbursement date | ||||||
# set disbursement datetime to min if not found | ||||||
|
||||||
if row.disbursement_details is None: | ||||||
disbursement_date = datetime.min | ||||||
|
||||||
else: | ||||||
if row.disbursement_details.disbursement_date is None: | ||||||
row.disbursement_details.disbursement_date = datetime.min | ||||||
|
||||||
disbursement_date = to_utc(datetime.combine( | ||||||
row.disbursement_details.disbursement_date, | ||||||
datetime.min.time())) | ||||||
|
||||||
# Is this more recent than our past stored value of update_at? | ||||||
# Is this more recent than our past stored value of disbursement_date? | ||||||
# Use >= for updated_at due to non monotonic updated_at values | ||||||
# Use > for disbursement_date - confirming all transactions disbursed | ||||||
# at the same time | ||||||
# Update our high water mark for updated_at and disbursement_date | ||||||
# in this run | ||||||
if ( | ||||||
updated_at >= latest_updated_at | ||||||
) or ( | ||||||
disbursement_date >= latest_disbursement_date | ||||||
): | ||||||
|
||||||
if updated_at > run_maximum_updated_at: | ||||||
run_maximum_updated_at = updated_at | ||||||
|
||||||
if disbursement_date > run_maximum_disbursement_date: | ||||||
run_maximum_disbursement_date = disbursement_date | ||||||
|
||||||
singer.write_record("transactions", transformed, | ||||||
time_extracted=time_extracted) | ||||||
row_written_count += 1 | ||||||
|
||||||
else: | ||||||
|
||||||
row_skipped_count += 1 | ||||||
|
||||||
logger.info("transactions: Written {} records from {} - {}".format( | ||||||
row_written_count, start, end | ||||||
)) | ||||||
|
||||||
logger.info("transactions: Skipped {} records from {} - {}".format( | ||||||
row_skipped_count, start, end | ||||||
)) | ||||||
|
||||||
# End day loop | ||||||
logger.info("transactions: Complete. Last updated record: {}".format( | ||||||
run_maximum_updated_at | ||||||
)) | ||||||
|
||||||
logger.info("transactions: Complete. Last disbursement date: {}".format( | ||||||
run_maximum_disbursement_date | ||||||
)) | ||||||
|
||||||
latest_updated_at = run_maximum_updated_at | ||||||
|
||||||
latest_disbursement_date = run_maximum_disbursement_date | ||||||
|
||||||
STATE['latest_updated_at'] = utils.strftime(latest_updated_at) | ||||||
|
||||||
STATE['latest_disbursement_date'] = utils.strftime( | ||||||
latest_disbursement_date) | ||||||
|
||||||
utils.update_state(STATE, "transactions", utils.strftime(end)) | ||||||
|
||||||
singer.write_state(STATE) | ||||||
|
||||||
|
||||||
def do_sync(): | ||||||
logger.info("Starting sync") | ||||||
sync_transactions() | ||||||
logger.info("Sync completed") | ||||||
|
||||||
|
||||||
@utils.handle_top_exception(logger) | ||||||
@utils.handle_top_exception(LOGGER) | ||||||
def main(): | ||||||
args = utils.parse_args( | ||||||
["merchant_id", "public_key", "private_key", "start_date"] | ||||||
) | ||||||
config = args.config | ||||||
|
||||||
environment = getattr( | ||||||
braintree.Environment, config.pop("environment", "Production") | ||||||
parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) | ||||||
config = {} | ||||||
state = {} | ||||||
|
||||||
if parsed_args.config: | ||||||
config = parsed_args.config | ||||||
|
||||||
if parsed_args.state: | ||||||
state = parsed_args.state | ||||||
|
||||||
environment = getattr(braintree.Environment, config.pop("environment", "Production")) | ||||||
|
||||||
gateway = braintree.BraintreeGateway( | ||||||
braintree.Configuration( | ||||||
environment, | ||||||
merchant_id = config['merchant_id'], | ||||||
public_key= config["public_key"], | ||||||
private_key=config["private_key"] | ||||||
) | ||||||
) | ||||||
|
||||||
CONFIG['start_date'] = config.pop('start_date') | ||||||
|
||||||
braintree.Configuration.configure(environment, **config) | ||||||
|
||||||
if args.state: | ||||||
STATE.update(args.state) | ||||||
|
||||||
try: | ||||||
do_sync() | ||||||
if parsed_args.discover: | ||||||
do_discover() | ||||||
else: | ||||||
_sync( | ||||||
gateway, | ||||||
config, | ||||||
parsed_args.catalog or discover(), | ||||||
state | ||||||
) | ||||||
except braintree.exceptions.authentication_error.AuthenticationError: | ||||||
logger.critical('Authentication error occured. ' | ||||||
'Please check your merchant_id, public_key, and ' | ||||||
'private_key for errors', exc_info=True) | ||||||
|
||||||
LOGGER.critical('Authentication error occured. Please check your merchant_id, public_key, and private_key for errors', exc_info=True) | ||||||
|
||||||
if __name__ == '__main__': | ||||||
main() |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,30 @@ | ||||||
import singer | ||||||
from singer.catalog import Catalog, CatalogEntry, Schema | ||||||
from tap_braintree.streams import STREAMS | ||||||
from tap_braintree.schema import get_schemas | ||||||
|
||||||
LOGGER = singer.get_logger() | ||||||
|
||||||
def discover(): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add function doc string for all the functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added doc strings for all the fuctions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Divide this file into two files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seperated discover.py's code into schema.py and discover.py |
||||||
""" | ||||||
Generate catalog for call the streams | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated doc strings |
||||||
""" | ||||||
|
||||||
schemas, field_metadata = get_schemas() | ||||||
catalog = Catalog([]) | ||||||
|
||||||
for stream_name, schema_dict in schemas.items(): | ||||||
schema = Schema.from_dict(schema_dict) | ||||||
mdata = field_metadata[stream_name] | ||||||
|
||||||
catalog.streams.append( | ||||||
CatalogEntry( | ||||||
stream=stream_name, | ||||||
tap_stream_id=stream_name, | ||||||
key_properties=STREAMS[stream_name].key_properties, | ||||||
schema=schema, | ||||||
metadata=mdata, | ||||||
) | ||||||
) | ||||||
|
||||||
return catalog |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import os | ||
import json | ||
from singer import metadata | ||
from tap_braintree.streams import STREAMS | ||
|
||
def get_abs_path(path): | ||
""" | ||
Return full path for given argument relative path | ||
""" | ||
|
||
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) | ||
|
||
def get_schemas(): | ||
""" | ||
Return metadata and schema for all the streams | ||
""" | ||
|
||
schemas = {} | ||
field_metadata = {} | ||
|
||
for stream_name, stream_metadata in STREAMS.items(): | ||
|
||
schema_path = get_abs_path("schemas/{}.json".format(stream_name)) | ||
with open(schema_path,"r") as file: | ||
schema = json.load(file) | ||
schemas[stream_name] = schema | ||
|
||
mdata = metadata.new() | ||
|
||
mdata = metadata.get_standard_metadata( | ||
schema=schema, | ||
key_properties=stream_metadata.key_properties, | ||
valid_replication_keys=stream_metadata.replication_keys, | ||
replication_method=stream_metadata.replication_method, | ||
) | ||
|
||
mdata = metadata.to_map(mdata) | ||
# Loop through all keys and make replication keys of automatic inclusion | ||
for field_name in schema["properties"].keys(): | ||
|
||
if stream_metadata.replication_keys and field_name in stream_metadata.replication_keys: | ||
mdata = metadata.write( | ||
mdata, ("properties", field_name), "inclusion", "automatic", | ||
) | ||
|
||
mdata = metadata.to_list(mdata) | ||
field_metadata[stream_name] = mdata | ||
|
||
return schemas, field_metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated logger messages