dynamic-singer, Python API, Dynamic source, Dynamic target, N targets, Prometheus exporter, realtime transformation for Singer ETL
This library is an extension for singer.io for easier deployment, metrics, auto-detect schema, realtime transformation and sinking to multiple targets. Read more about singer.io at https://www.singer.io/.
dynamic-singer also able to run in Jupyter Notebook.
pip install dynamic-singer
If you are familiar with singer, we know to start sourcing from Tap to Target required Bash |
command, example,
tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config.json
For dynamic-singer, you can run this using Python interface,
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.start()
Check google spreadsheet here, link
Full example, check example/fixerio-gsheet.ipynb.
Now we want to keep track metrics from Tap and Targets, by default we cannot do it using singer because singer using Bash pipe |
, to solve that, we need to do something like,
tap | prometheus | target | prometheus
But prometheus
need to understand the pipe. And nobody got time for that. Do not worry, by default dynamic-singer already enable prometheus exporter. dynamic-singer captures,
- output rates from tap
- data size from tap
- output rates from target
- data size from target
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.start()
So if you go to http://localhost:8000,
# HELP total_tap_fixerio_total total rows tap_fixerio
# TYPE total_tap_fixerio_total counter
total_tap_fixerio_total 4.0
# TYPE total_tap_fixerio_created gauge
total_tap_fixerio_created 1.5887420455044758e+09
# HELP data_size_tap_fixerio summary of data size tap_fixerio (KB)
# TYPE data_size_tap_fixerio summary
data_size_tap_fixerio_count 4.0
data_size_tap_fixerio_sum 0.738
# TYPE data_size_tap_fixerio_created gauge
data_size_tap_fixerio_created 1.588742045504552e+09
total_target_gsheet_total 4.0
# TYPE total_target_gsheet_created gauge
total_target_gsheet_created 1.588742045529744e+09
# HELP data_size_target_gsheet summary of data size target_gsheet (KB)
# TYPE data_size_target_gsheet summary
data_size_target_gsheet_count 4.0
data_size_target_gsheet_sum 0.196
# TYPE data_size_target_gsheet_created gauge
data_size_target_gsheet_created 1.5887420455298738e+09
Name convention simply took from tap / target name.
Let say I want to target more than 1 targets, I want to save to 2 different spreadsheets at the same time. If singer, we need to initiate pipe twice.
tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config1.json
tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config2.json
If we do this, both sheets probably got different data! Oh no!
So to add more than one target using dynamic-singer,
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.add('target-gsheet --config gsheet-config1.json')
source.start()
Check first google spreadsheet here, link
Check second google spreadsheet here, link
Full example, check example/fixerio-gsheet-twice.ipynb.
Now let say I want to transfer data from python code as a Tap, I need to write it like,
python3 tap.py | target-gsheet --config gsheet-config.json
Good thing if using dynamic-singer, you can directly transfer data from python object into Targets.
import dynamic_singer as dsinger
class Example:
def __init__(self, size):
self.size = size
self.count = 0
def emit(self):
if self.count < self.size:
self.count += 1
return {'data': self.count}
example = Example(20)
source = dsinger.Source(example, tap_name = 'example', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start()
Check google spreadsheet here, link
Full example, check example/iterator-gsheet.ipynb.
- Must has
emit
method.
If not, it will throw an error,
ValueError: tap must a string or an object with method `emit`
emit
must returned a dict, if want to terminate, simply returnedNone
.
If not, it will throw an error,
ValueError: tap.emit() must returned a dict
tap_schema
must a dict or None. If None, it will auto generate schema based ontap.emit()
.tap_name
is necessary, this is name for the tap.tap_key
is necessary, it acted as primary key for the tap.
If tap_key
not inside the dictionary, it will throw an error,
ValueError: tap key not exist in elements from tap
Now if we look into target provided by singer.io, example like, https://github.com/singer-io/target-gsheet, or https://github.com/RealSelf/target-bigquery, to build target is complicated and must able to parse value from terminal pipe.
But with dynamic-singer, to create a target is very simple.
Let say I want to build a target that save every row from fixer-io to a text file,
import dynamic_singer as dsinger
class Target:
def __init__(self, filename):
self.f = open(filename, 'a')
def parse(self, row):
self.f.write(row)
return row
target = Target('test.txt')
source = dsinger.Source('tap-fixerio --config fixer-config.json')
source.add(target)
source.start()
After that, check test.txt,
{"type": "SCHEMA", "stream": "exchange_rate", "schema": {"type": "object", "properties": {"date": {"type": "string", "format": "date-time"}}, "additionalProperties": true}, "key_properties": ["date"]}{"type": "RECORD", "stream": "exchange_rate", "record": {"GBP": "0.871002", "JPY": "115.375629", "EUR": "1.0", "date": "2020-05-05T00:00:00Z"}}{"type": "RECORD", "stream": "exchange_rate", "record": {"GBP": "0.872634", "JPY": "114.804452", "EUR": "1.0", "date": "2020-05-06T00:00:00Z"}}{"type": "STATE", "value": {"start_date": "2020-05-06"}}
Singer tap always send schema information, so remember to parse it properly.
Full example, check example/fixerio-writefile.ipynb.
- Must has
parse
method.
If not, it will throw an error,
ValueError: target must a string or an object with method `parse`
When talking about transformation,
- We want to add new values in a row.
- Edit existing values in a row.
- Filter rows based on certain conditions.
dynamic-singer supported realtime transformation as simple,
import dynamic_singer as dsinger
from datetime import datetime
count = 0
def transformation(row):
global count
row['extra'] = count
count += 1
return row
example = Example(20)
source = dsinger.Source(example, tap_name = 'example-transformation', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start(transformation = transformation)
Even we added new values in the row, dynamic-singer will auto generate new schema.
Full example, check example/iterator-transformation-gsheet.ipynb.
import dynamic_singer as dsinger
from datetime import datetime
def transformation(row):
if row['data'] > 5:
return row
example = Example(20)
source = dsinger.Source(example, tap_name = 'example-transformation', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start(transformation = transformation)
Full example, check example/iterator-filter-gsheet.ipynb.
Some of databases not supported generated schema, so we need to map by ourselves.
If you want to define schema type for returned value, simply,
count = 0
def transformation(row):
global count
row['extra'] = count
count += 1
return row, {'extra': 'int'}
Example like example/postgres-bq-transformation.ipynb.
Again, this is not necessary for most of unstructured target, but we recommended to include it.
Tap from fixerio and target to gsheet.
Tap from fixerio and target to multiple gsheets.
use Python object as a Tap and target to gsheet.
Tap from fixerio and save to file using Python object as a Target.
Tap from fixerio and save to gsheet, save to file using Python object as a Target and save to bigquery.
use Python object as a Tap, transform realtime and target to gsheet.
use Python object as a Tap, filter realtime and target to gsheet.
use dynamic_singer.extra.postgres.Tap to pull data from postgres and dump to bigquery.
use dynamic_singer.extra.postgres.Tap to pull data from postgres, do transformation and dump to bigquery.
class Source:
def __init__(
self,
tap,
tap_schema: Dict = None,
tap_name: str = None,
tap_key: str = None,
port: int = 8000,
):
"""
Parameters
----------
tap: str / object
tap source.
tap_schema: Dict, (default=None)
data schema if tap an object. If `tap_schema` is None, it will auto generate schema.
tap_name: str, (default=None)
name for tap, necessary if tap is an object. it will throw an error if not a string if tap is an object.
tap_key: str, (default=None)
important non-duplicate key from `tap.emit()`, usually a timestamp.
port: int, (default=8000)
prometheus exporter port.
"""
def add(self, target):
"""
Parameters
----------
target: str / object
target source.
"""
def get_targets(self):
"""
Returns
----------
result: list of targets
"""
def delete_target(self, index: int):
"""
Parameters
----------
index: int
target index from `get_targets()`.
"""
def start(
self,
transformation: Callable = None,
asynchronous: bool = False,
debug: bool = True,
ignore_null: bool = True,
graceful_shutdown: int = 30,
post_function: Callable = None,
):
"""
Parameters
----------
transformation: Callable, (default=None)
a callable variable to transform tap data, this will auto generate new data schema.
asynchronous: bool, (default=False)
If True, emit to targets in async manner, else, loop from first target until last target.
debug: bool, (default=True)
If True, will print every rows emitted and parsed.
ignore_null: bool, (default=True)
If False, if one of schema value is Null, it will throw an exception.
graceful_shutdown: int, (default=30)
If bigger than 0, any error happened, will automatically shutdown after sleep.
post_function: Callable, (default=None)
If callable, it will pass metadata to the function.
"""
graceful_shutdown
is useful if we deployed in an environment that able to auto restart like Kubernetes.
def bigquery_schema(schema: str, table: str, connection):
"""
Generate bigquery schema.
Parameters
----------
schema: str
postgres schema.
table: str
table name.
connection: object
psycopg2 connection object.
Returns
-------
result : dict
"""
Full example, check example/postgres-bq.ipynb or example/postgres-bq-transformation.ipynb.
class Tap:
def __init__(
self,
schema: str,
table: str,
primary_key: str,
connection,
persistent,
batch_size: int = 100,
rest_time: int = 10,
filter: str = '',
):
"""
Postgres Tap using query statement.
Parameters
----------
schema: str
postgres schema.
table: str
table name.
primary_key: str
column acted as primary key.
connection: object
psycopg2 connection object.
persistent: object
a python object that must has `pull` and `push` method to persist primary_key state.
batch_size: int, (default=100)
size of rows for each pull from postgres.
rest_time: int, (default=10)
rest for rest_time seconds after done pulled.
filter: str, (default='')
sql where statement for additional filter. Example, 'price > 0 and discount > 10', depends on table definition.
"""
Full example, check example/postgres-bq.ipynb or example/postgres-bq-transformation.ipynb.
class BQ_GCS:
def __init__(
self,
bq_client,
bucket,
project: str,
schema: str,
table: str,
primary_key: str,
prefix: str = 'singer_record',
):
"""
Persistency layer for BQ combined with GCS.
Parameters
----------
bq_client: object
initiated from `from google.cloud import bigquery`.
bucket: object
initiated from `from google.cloud import storage`.
project: str
project id.
schema: str
BQ schema.
table: str
table name.
primary_key: str
column acted as primary key.
prefix: str
prefix path for GCS.
"""
class GCS:
def __init__(
self,
bucket,
project: str,
schema: str,
table: str,
primary_key: str,
prefix: str = 'singer_record',
):
"""
Persistency layer using GCS.
Parameters
----------
bucket: object
initiated from `from google.cloud import storage`.
schema: str
BQ schema.
table: str
table name.
primary_key: str
column acted as primary key.
prefix: str
prefix path for GCS.
"""