Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19835: Adding new streams and discovery mode #38

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_braintree'],
install_requires=[
'singer-python==5.5.0',
'singer-python==5.12.2',
'requests==2.20.0',
'braintree==3.53.0',
'braintree==4.16.0',
],
extras_require={
'dev': [
Expand Down
264 changes: 47 additions & 217 deletions tap_braintree/__init__.py
Original file line number Diff line number Diff line change
@@ -1,235 +1,65 @@
#!/usr/bin/env python3

from datetime import datetime, timedelta
import os
import pytz


import sys
import json
import braintree
import singer

from singer import utils
from .transform import transform_row


CONFIG = {}
STATE = {}
TRAILING_DAYS = timedelta(days=30)
DEFAULT_TIMESTAMP = "1970-01-01T00:00:00Z"

logger = singer.get_logger()


def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

from tap_braintree.discover import discover
from tap_braintree.sync import sync as _sync

def load_schema(entity):
return utils.load_json(get_abs_path("schemas/{}.json".format(entity)))
REQUIRED_CONFIG_KEYS = [
"merchant_id",
"public_key",
"private_key",
"start_date"
]

LOGGER = singer.get_logger()

def get_start(entity):
if entity not in STATE:
STATE[entity] = CONFIG["start_date"]

return STATE[entity]


def to_utc(dt):
return dt.replace(tzinfo=pytz.UTC)


def daterange(start_date, end_date):
def do_discover():
"""
Generator function that produces an iterable list of days between the two
dates start_date and end_date as a tuple pair of datetimes.

Note:
All times are set to 0:00. Designed to be used in date query where query
logic would be record_date >= 2019-01-01 0:00 and record_date < 2019-01-02 0:00

Args:
start_date (datetime): start of period
end_date (datetime): end of period

Yields:
tuple: daily period
* datetime: day within range
* datetime: day within range + 1 day

Run discovery mode
"""

# set to start of day
start_date = to_utc(
datetime.combine(
start_date.date(),
datetime.min.time() # set to the 0:00 on the day of the start date
)
)

end_date = to_utc(end_date + timedelta(1))

for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n), start_date + timedelta(n + 1)


def sync_transactions():
schema = load_schema("transactions")

singer.write_schema("transactions", schema, ["id"],
bookmark_properties=['created_at'])

latest_updated_at = utils.strptime_to_utc(STATE.get('latest_updated_at', DEFAULT_TIMESTAMP))

run_maximum_updated_at = latest_updated_at

latest_disbursement_date = utils.strptime_to_utc(STATE.get('latest_disbursment_date', DEFAULT_TIMESTAMP))

run_maximum_disbursement_date = latest_disbursement_date

latest_start_date = utils.strptime_to_utc(get_start("transactions"))

period_start = latest_start_date - TRAILING_DAYS

period_end = utils.now()

logger.info("transactions: Syncing from {}".format(period_start))

logger.info("transactions: latest_updated_at from {}, disbursement_date from {}".format(
latest_updated_at, latest_disbursement_date
))

logger.info("transactions: latest_start_date from {}".format(
latest_start_date
))

# increment through each day (20k results max from api)
for start, end in daterange(period_start, period_end):
LOGGER.info("Starting discovery")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Starting discovery")
LOGGER.info("Starting discover mode")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logger messages

catalog = discover()
json.dump(catalog.to_dict(), sys.stdout, indent=2)
LOGGER.info("Finished discover")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Finished discover")
LOGGER.info("Finished discover mode")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logger messages as per your suggestions


end = min(end, period_end)

data = braintree.Transaction.search(
braintree.TransactionSearch.created_at.between(start, end))
time_extracted = utils.now()

logger.info("transactions: Fetched {} records from {} - {}".format(
data.maximum_size, start, end
))

row_written_count = 0
row_skipped_count = 0

for row in data:
# Ensure updated_at consistency
if not getattr(row, 'updated_at'):
row.updated_at = row.created_at

transformed = transform_row(row, schema)
updated_at = to_utc(row.updated_at)

# if disbursement is successful, get disbursement date
# set disbursement datetime to min if not found

if row.disbursement_details is None:
disbursement_date = datetime.min

else:
if row.disbursement_details.disbursement_date is None:
row.disbursement_details.disbursement_date = datetime.min

disbursement_date = to_utc(datetime.combine(
row.disbursement_details.disbursement_date,
datetime.min.time()))

# Is this more recent than our past stored value of update_at?
# Is this more recent than our past stored value of disbursement_date?
# Use >= for updated_at due to non monotonic updated_at values
# Use > for disbursement_date - confirming all transactions disbursed
# at the same time
# Update our high water mark for updated_at and disbursement_date
# in this run
if (
updated_at >= latest_updated_at
) or (
disbursement_date >= latest_disbursement_date
):

if updated_at > run_maximum_updated_at:
run_maximum_updated_at = updated_at

if disbursement_date > run_maximum_disbursement_date:
run_maximum_disbursement_date = disbursement_date

singer.write_record("transactions", transformed,
time_extracted=time_extracted)
row_written_count += 1

else:

row_skipped_count += 1

logger.info("transactions: Written {} records from {} - {}".format(
row_written_count, start, end
))

logger.info("transactions: Skipped {} records from {} - {}".format(
row_skipped_count, start, end
))

# End day loop
logger.info("transactions: Complete. Last updated record: {}".format(
run_maximum_updated_at
))

logger.info("transactions: Complete. Last disbursement date: {}".format(
run_maximum_disbursement_date
))

latest_updated_at = run_maximum_updated_at

latest_disbursement_date = run_maximum_disbursement_date

STATE['latest_updated_at'] = utils.strftime(latest_updated_at)

STATE['latest_disbursement_date'] = utils.strftime(
latest_disbursement_date)

utils.update_state(STATE, "transactions", utils.strftime(end))

singer.write_state(STATE)


def do_sync():
logger.info("Starting sync")
sync_transactions()
logger.info("Sync completed")


@utils.handle_top_exception(logger)
@utils.handle_top_exception(LOGGER)
def main():
args = utils.parse_args(
["merchant_id", "public_key", "private_key", "start_date"]
)
config = args.config

environment = getattr(
braintree.Environment, config.pop("environment", "Production")
parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
config = {}
state = {}

if parsed_args.config:
config = parsed_args.config

if parsed_args.state:
state = parsed_args.state

environment = getattr(braintree.Environment, config.pop("environment", "Production"))

gateway = braintree.BraintreeGateway(
braintree.Configuration(
environment,
merchant_id = config['merchant_id'],
public_key= config["public_key"],
private_key=config["private_key"]
)
)

CONFIG['start_date'] = config.pop('start_date')

braintree.Configuration.configure(environment, **config)

if args.state:
STATE.update(args.state)

try:
do_sync()
if parsed_args.discover:
do_discover()
else:
_sync(
gateway,
config,
parsed_args.catalog or discover(),
state
)
except braintree.exceptions.authentication_error.AuthenticationError:
logger.critical('Authentication error occured. '
'Please check your merchant_id, public_key, and '
'private_key for errors', exc_info=True)

LOGGER.critical('Authentication error occured. Please check your merchant_id, public_key, and private_key for errors', exc_info=True)

if __name__ == '__main__':
main()
30 changes: 30 additions & 0 deletions tap_braintree/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import singer
from singer.catalog import Catalog, CatalogEntry, Schema
from tap_braintree.streams import STREAMS
from tap_braintree.schema import get_schemas

LOGGER = singer.get_logger()

def discover():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add function doc string for all the functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc strings for all the fuctions.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Divide this file into two files schema.py and discover.py because you are doing full refactoring.
Reference: https://github.com/singer-io/tap-github/pull/168/files

Copy link
Author

@jtilala jtilala Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seperated discover.py's code into schema.py and discover.py

"""
Generate catalog for call the streams

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Generate catalog for call the streams
Run the discovery mode, prepare the catalog file, and return the catalog.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated doc strings

"""

schemas, field_metadata = get_schemas()
catalog = Catalog([])

for stream_name, schema_dict in schemas.items():
schema = Schema.from_dict(schema_dict)
mdata = field_metadata[stream_name]

catalog.streams.append(
CatalogEntry(
stream=stream_name,
tap_stream_id=stream_name,
key_properties=STREAMS[stream_name].key_properties,
schema=schema,
metadata=mdata,
)
)

return catalog
49 changes: 49 additions & 0 deletions tap_braintree/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import json
from singer import metadata
from tap_braintree.streams import STREAMS

def get_abs_path(path):
"""
Return full path for given argument relative path
"""

return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

def get_schemas():
"""
Return metadata and schema for all the streams
"""

schemas = {}
field_metadata = {}

for stream_name, stream_metadata in STREAMS.items():

schema_path = get_abs_path("schemas/{}.json".format(stream_name))
with open(schema_path,"r") as file:
schema = json.load(file)
schemas[stream_name] = schema

mdata = metadata.new()

mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=stream_metadata.key_properties,
valid_replication_keys=stream_metadata.replication_keys,
replication_method=stream_metadata.replication_method,
)

mdata = metadata.to_map(mdata)
# Loop through all keys and make replication keys of automatic inclusion
for field_name in schema["properties"].keys():

if stream_metadata.replication_keys and field_name in stream_metadata.replication_keys:
mdata = metadata.write(
mdata, ("properties", field_name), "inclusion", "automatic",
)

mdata = metadata.to_list(mdata)
field_metadata[stream_name] = mdata

return schemas, field_metadata
Loading