Skip to content

Commit

Permalink
TDL-20793: Improve Tap-Asana Reliability (#53)
Browse files Browse the repository at this point in the history
* TDL-20795: Add missing tap-tester tests (#47)

* added missing tap-tester tests and assertions

* resolved integration test failure

* updated the code as per PEP8 standards

* resolved pagination test failire

* resolved test case fialure

* upgraded the asana-python SDK and added missing fields

* Revert "updated the code as per PEP8 standards"

This reverts commit 8839e3a.

* updated code as per PEP8 standards

* added parameterized in config.yml file

* updated all fields test case

* reverted datatype update

* upgraded asana SDK and singer-python version

* - updated integration tests
- removed pagination integration test

* - removed pagination test

* - fixed the review comments

* TDL-20794: Upgrade SDK Version and add missing fields. (#48)

* added the formats as date-format for the fields as per the documentation

* schema fields addition

* added few fields which are used as opt_fields

* fixed all fields integration test

---------

Co-authored-by: “rdeshmukh15” <[email protected]>
Co-authored-by: RushiT0122 <[email protected]>

---------

Co-authored-by: RushiT0122 <[email protected]>
Co-authored-by: “rdeshmukh15” <[email protected]>

* resolved the PR comments

* added date-time format

* sdk version updated to 3.1.0

---------

Co-authored-by: Harsh <[email protected]>
Co-authored-by: RushiT0122 <[email protected]>
  • Loading branch information
3 people authored Feb 27, 2023
1 parent 86f794f commit ec14e55
Show file tree
Hide file tree
Showing 31 changed files with 2,788 additions and 1,213 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-asana/bin/activate
pip install nose coverage
pip install nose coverage parameterized
nosetests --with-coverage --cover-erase --cover-package=tap_asana --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_asana"],
install_requires=[
"asana==0.10.2",
'singer-python==5.12.2'
"asana==3.1.0",
'singer-python==5.13.0'
],
extras_require={
'test': [
Expand Down
93 changes: 53 additions & 40 deletions tap_asana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
from singer import Transformer
from tap_asana.asana import Asana
from tap_asana.context import Context
import tap_asana.streams # Load stream objects into Context
import tap_asana.streams # Load stream objects into Context

REQUIRED_CONFIG_KEYS = [
"start_date",
"client_id",
"client_secret",
"redirect_uri",
"refresh_token"
"refresh_token",
]


Expand All @@ -32,14 +32,15 @@ def get_abs_path(path):

# Load schemas from schemas folder
def load_schemas():
"""Load schemas for catalog"""
schemas = {}

# This schema represents many of the currency values as JSON schema
# 'number's, which may result in lost precision.
for filename in os.listdir(get_abs_path('schemas')):
path = get_abs_path('schemas') + '/' + filename
schema_name = filename.replace('.json', '')
with open(path) as file: # pylint: disable=unspecified-encoding
for filename in os.listdir(get_abs_path("schemas")):
path = get_abs_path("schemas") + "/" + filename
schema_name = filename.replace(".json", "")
with open(path) as file: # pylint: disable=unspecified-encoding
try:
schemas[schema_name] = json.load(file)
except ValueError:
Expand All @@ -49,23 +50,33 @@ def load_schemas():


def get_discovery_metadata(stream, schema):
"""Generate metadata"""
mdata = metadata.new()
mdata = metadata.write(mdata, (), 'table-key-properties', stream.key_properties)
mdata = metadata.write(mdata, (), 'forced-replication-method', stream.replication_method)
mdata = metadata.write(mdata, (), "table-key-properties", stream.key_properties)
mdata = metadata.write(
mdata, (), "forced-replication-method", stream.replication_method
)

if stream.replication_key:
mdata = metadata.write(mdata, (), 'valid-replication-keys', [stream.replication_key])
mdata = metadata.write(
mdata, (), "valid-replication-keys", [stream.replication_key]
)

for field_name in schema['properties'].keys():
for field_name in schema["properties"].keys():
if field_name in stream.key_properties or field_name == stream.replication_key:
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic')
mdata = metadata.write(
mdata, ("properties", field_name), "inclusion", "automatic"
)
else:
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'available')
mdata = metadata.write(
mdata, ("properties", field_name), "inclusion", "available"
)

return metadata.to_list(mdata)


def discover():
"""Discover logic for tap"""
LOGGER.info("Starting discover")
raw_schemas = load_schemas()

Expand All @@ -78,28 +89,28 @@ def discover():

stream = Context.stream_objects[schema_name]()

# create and add catalog entry
# Create and add catalog entry
catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': singer.resolve_schema_references(schema, refs),
'metadata' : get_discovery_metadata(stream, schema),
'key_properties': stream.key_properties,
'replication_key': stream.replication_key,
'replication_method': stream.replication_method
"stream": schema_name,
"tap_stream_id": schema_name,
"schema": singer.resolve_schema_references(schema, refs),
"metadata": get_discovery_metadata(stream, schema),
"key_properties": stream.key_properties,
"replication_key": stream.replication_key,
"replication_method": stream.replication_method,
}
streams.append(catalog_entry)

LOGGER.info("Finished discover")

return {'streams': streams}
return {"streams": streams}


def shuffle_streams(stream_name):
'''
"""
Takes the name of the first stream to sync and reshuffles the order
of the list to put it at the top
'''
"""
matching_index = 0
for i, catalog_entry in enumerate(Context.catalog["streams"]):
if catalog_entry["tap_stream_id"] == stream_name:
Expand All @@ -110,6 +121,7 @@ def shuffle_streams(stream_name):


def sync():
"""Sync logic for tap"""
# Emit all schemas first so we have them for child streams
for stream in Context.catalog["streams"]:
if Context.is_selected(stream["tap_stream_id"]):
Expand All @@ -119,15 +131,15 @@ def sync():
Context.counts[stream["tap_stream_id"]] = 0

# Loop over streams in catalog
for catalog_entry in Context.catalog['streams']:
stream_id = catalog_entry['tap_stream_id']
for catalog_entry in Context.catalog["streams"]:
stream_id = catalog_entry["tap_stream_id"]
stream = Context.stream_objects[stream_id]()

if not Context.is_selected(stream_id):
LOGGER.info('Skipping stream: %s', stream_id)
LOGGER.info("Skipping stream: %s", stream_id)
continue

LOGGER.info('Syncing stream: %s', stream_id)
LOGGER.info("Syncing stream: %s", stream_id)

if not Context.state.get('bookmarks'):
Context.state['bookmarks'] = {}
Expand All @@ -136,34 +148,35 @@ def sync():
with Transformer() as transformer:
for rec in stream.sync():
extraction_time = singer.utils.now()
record_schema = catalog_entry['schema']
record_metadata = metadata.to_map(catalog_entry['metadata'])
record_schema = catalog_entry["schema"]
record_metadata = metadata.to_map(catalog_entry["metadata"])
rec = transformer.transform(rec, record_schema, record_metadata)
singer.write_record(stream_id,
rec,
time_extracted=extraction_time)
singer.write_record(stream_id, rec, time_extracted=extraction_time)
Context.counts[stream_id] += 1

Context.state['bookmarks'].pop('currently_sync_stream')
Context.state["bookmarks"].pop("currently_sync_stream")
singer.write_state(Context.state)

LOGGER.info('----------------------')
LOGGER.info("----------------------")
for stream_id, stream_count in Context.counts.items():
LOGGER.info('%s: %d', stream_id, stream_count)
LOGGER.info('----------------------')
LOGGER.info("%s: %d", stream_id, stream_count)
LOGGER.info("----------------------")


@utils.handle_top_exception(LOGGER)
def main():
"""
Run discover mode or sync mode.
"""
# Parse command line arguments
args = utils.parse_args(REQUIRED_CONFIG_KEYS)

# Set context.
creds = {
"client_id": args.config['client_id'],
"client_secret": args.config['client_secret'],
"redirect_uri": args.config['redirect_uri'],
"refresh_token": args.config['refresh_token']
"client_id": args.config["client_id"],
"client_secret": args.config["client_secret"],
"redirect_uri": args.config["redirect_uri"],
"refresh_token": args.config["refresh_token"],
}

# As we passed 'request_timeout', we need to add a whole 'args.config' rather than adding 'creds'
Expand Down
78 changes: 49 additions & 29 deletions tap_asana/asana.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,57 @@

import asana
import singer

LOGGER = singer.get_logger()


""" Simple wrapper for Asana. """
class Asana():

def __init__(self, client_id, client_secret, redirect_uri, refresh_token, access_token=None): # pylint: disable=too-many-arguments
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.refresh_token = refresh_token
self.access_token = access_token
self._client = self._oauth_auth() or self._access_token_auth()
self.refresh_access_token()

def _oauth_auth(self):
if self.client_id is None or self.client_secret is None or self.redirect_uri is None or self.refresh_token is None:
LOGGER.debug("OAuth authentication unavailable.")
return None
return asana.Client.oauth(client_id=self.client_id, client_secret=self.client_secret, redirect_uri=self.redirect_uri)

def _access_token_auth(self):
if self.access_token is None:
LOGGER.debug("OAuth authentication unavailable.")
return None
return asana.Client.access_token(self.access_token)

def refresh_access_token(self):
return self._client.session.refresh_token(self._client.session.token_url, client_id=self.client_id, client_secret=self.client_secret, refresh_token=self.refresh_token)

@property
def client(self):
return self._client

class Asana():
"""Base class for tap-asana"""

def __init__(
self, client_id, client_secret, redirect_uri, refresh_token, access_token=None
): # pylint: disable=too-many-arguments
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.refresh_token = refresh_token
self.access_token = access_token
self._client = self._oauth_auth() or self._access_token_auth()
self.refresh_access_token()

def _oauth_auth(self):
"""Oauth authentication for tap"""
if (
self.client_id is None
or self.client_secret is None
or self.redirect_uri is None
or self.refresh_token is None
):
LOGGER.debug("OAuth authentication unavailable.")
return None
return asana.Client.oauth(
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uri=self.redirect_uri,
)

def _access_token_auth(self):
"""Check for access token"""
if self.access_token is None:
LOGGER.debug("OAuth authentication unavailable.")
return None
return asana.Client.access_token(self.access_token)

def refresh_access_token(self):
return self._client.session.refresh_token(
self._client.session.token_url,
client_id=self.client_id,
client_secret=self.client_secret,
refresh_token=self.refresh_token,
)

@property
def client(self):
return self._client
11 changes: 5 additions & 6 deletions tap_asana/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@



from singer import metadata


Expand All @@ -15,12 +12,14 @@ class Context():

@classmethod
def get_catalog_entry(cls, stream_name):
"""Get entry from catalog"""
if not cls.stream_map:
cls.stream_map = {s["tap_stream_id"]: s for s in cls.catalog['streams']}
cls.stream_map = {s["tap_stream_id"]: s for s in cls.catalog["streams"]}
return cls.stream_map[stream_name]

@classmethod
def is_selected(cls, stream_name):
"""Return selected data"""
stream = cls.get_catalog_entry(stream_name)
stream_metadata = metadata.to_map(stream['metadata'])
return metadata.get(stream_metadata, (), 'selected')
stream_metadata = metadata.to_map(stream["metadata"])
return metadata.get(stream_metadata, (), "selected")
Loading

0 comments on commit ec14e55

Please sign in to comment.