Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.
/ dynamic-singer Public archive

Python API, Dynamic source, Dynamic target, N targets, Prometheus exporter, realtime transformation for Singer ETL

License

Notifications You must be signed in to change notification settings

kfit-dev/dynamic-singer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

37 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

logo


dynamic-singer, Python API, Dynamic source, Dynamic target, N targets, Prometheus exporter, realtime transformation for Singer ETL

This library is an extension for singer.io for easier deployment, metrics, auto-detect schema, realtime transformation and sinking to multiple targets. Read more about singer.io at https://www.singer.io/.

dynamic-singer also able to run in Jupyter Notebook.

Table of contents

Installing from the PyPI

pip install dynamic-singer

How-to

Run using Python

If you are familiar with singer, we know to start sourcing from Tap to Target required Bash | command, example,

tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config.json

For dynamic-singer, you can run this using Python interface,

import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.start()

Check google spreadsheet here, link

logo

Full example, check example/fixerio-gsheet.ipynb.

Prometheus exporter

Now we want to keep track metrics from Tap and Targets, by default we cannot do it using singer because singer using Bash pipe |, to solve that, we need to do something like,

tap | prometheus | target | prometheus

But prometheus need to understand the pipe. And nobody got time for that. Do not worry, by default dynamic-singer already enable prometheus exporter. dynamic-singer captures,

  1. output rates from tap
  2. data size from tap
  3. output rates from target
  4. data size from target
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.start()

So if you go to http://localhost:8000,

# HELP total_tap_fixerio_total total rows tap_fixerio
# TYPE total_tap_fixerio_total counter
total_tap_fixerio_total 4.0
# TYPE total_tap_fixerio_created gauge
total_tap_fixerio_created 1.5887420455044758e+09
# HELP data_size_tap_fixerio summary of data size tap_fixerio (KB)
# TYPE data_size_tap_fixerio summary
data_size_tap_fixerio_count 4.0
data_size_tap_fixerio_sum 0.738
# TYPE data_size_tap_fixerio_created gauge
data_size_tap_fixerio_created 1.588742045504552e+09

total_target_gsheet_total 4.0
# TYPE total_target_gsheet_created gauge
total_target_gsheet_created 1.588742045529744e+09
# HELP data_size_target_gsheet summary of data size target_gsheet (KB)
# TYPE data_size_target_gsheet summary
data_size_target_gsheet_count 4.0
data_size_target_gsheet_sum 0.196
# TYPE data_size_target_gsheet_created gauge
data_size_target_gsheet_created 1.5887420455298738e+09

Name convention simply took from tap / target name.

N targets

Let say I want to target more than 1 targets, I want to save to 2 different spreadsheets at the same time. If singer, we need to initiate pipe twice.

tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config1.json
tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config2.json

If we do this, both sheets probably got different data! Oh no!

So to add more than one target using dynamic-singer,

import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.add('target-gsheet --config gsheet-config1.json')
source.start()

Check first google spreadsheet here, link

logo

Check second google spreadsheet here, link

logo

Full example, check example/fixerio-gsheet-twice.ipynb.

Tap Python object

Now let say I want to transfer data from python code as a Tap, I need to write it like,

python3 tap.py | target-gsheet --config gsheet-config.json

Good thing if using dynamic-singer, you can directly transfer data from python object into Targets.

import dynamic_singer as dsinger

class Example:
    def __init__(self, size):
        self.size = size
        self.count = 0
        
    def emit(self):
        if self.count < self.size:
            self.count += 1
            return {'data': self.count}


example = Example(20)
source = dsinger.Source(example, tap_name = 'example', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start()

Check google spreadsheet here, link

logo

Full example, check example/iterator-gsheet.ipynb.

Rules if we use an object

  1. Must has emit method.

If not, it will throw an error,

ValueError: tap must a string or an object with method `emit`
  1. emit must returned a dict, if want to terminate, simply returned None.

If not, it will throw an error,

ValueError: tap.emit() must returned a dict
  1. tap_schema must a dict or None. If None, it will auto generate schema based on tap.emit().
  2. tap_name is necessary, this is name for the tap.
  3. tap_key is necessary, it acted as primary key for the tap.

If tap_key not inside the dictionary, it will throw an error,

ValueError: tap key not exist in elements from tap

Target Python object

Now if we look into target provided by singer.io, example like, https://github.com/singer-io/target-gsheet, or https://github.com/RealSelf/target-bigquery, to build target is complicated and must able to parse value from terminal pipe.

But with dynamic-singer, to create a target is very simple.

Let say I want to build a target that save every row from fixer-io to a text file,

import dynamic_singer as dsinger

class Target:
    def __init__(self, filename):
        self.f = open(filename, 'a')
        
    def parse(self, row):
        self.f.write(row)
        return row

target = Target('test.txt')
source = dsinger.Source('tap-fixerio --config fixer-config.json')
source.add(target)
source.start()

After that, check test.txt,

{"type": "SCHEMA", "stream": "exchange_rate", "schema": {"type": "object", "properties": {"date": {"type": "string", "format": "date-time"}}, "additionalProperties": true}, "key_properties": ["date"]}{"type": "RECORD", "stream": "exchange_rate", "record": {"GBP": "0.871002", "JPY": "115.375629", "EUR": "1.0", "date": "2020-05-05T00:00:00Z"}}{"type": "RECORD", "stream": "exchange_rate", "record": {"GBP": "0.872634", "JPY": "114.804452", "EUR": "1.0", "date": "2020-05-06T00:00:00Z"}}{"type": "STATE", "value": {"start_date": "2020-05-06"}}

Singer tap always send schema information, so remember to parse it properly.

Full example, check example/fixerio-writefile.ipynb.

Rules if we use an object

  1. Must has parse method.

If not, it will throw an error,

ValueError: target must a string or an object with method `parse`

Realtime Transformation

When talking about transformation,

  1. We want to add new values in a row.
  2. Edit existing values in a row.
  3. Filter rows based on certain conditions.

Add new keys

dynamic-singer supported realtime transformation as simple,

import dynamic_singer as dsinger
from datetime import datetime

count = 0
def transformation(row):
    global count
    row['extra'] = count
    count += 1
    return row

example = Example(20)
source = dsinger.Source(example, tap_name = 'example-transformation', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start(transformation = transformation)

Even we added new values in the row, dynamic-singer will auto generate new schema.

logo

Full example, check example/iterator-transformation-gsheet.ipynb.

Filter rows based on conditions

import dynamic_singer as dsinger
from datetime import datetime

def transformation(row):
    if row['data'] > 5:
        return row

example = Example(20)
source = dsinger.Source(example, tap_name = 'example-transformation', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start(transformation = transformation)

logo

Full example, check example/iterator-filter-gsheet.ipynb.

Define schema

Some of databases not supported generated schema, so we need to map by ourselves.

If you want to define schema type for returned value, simply,

count = 0
def transformation(row):
    global count
    row['extra'] = count
    count += 1
    return row, {'extra': 'int'}

Example like example/postgres-bq-transformation.ipynb.

Again, this is not necessary for most of unstructured target, but we recommended to include it.

Example

  1. fixerio-gsheet.ipynb.

Tap from fixerio and target to gsheet.

  1. fixerio-gsheet-twice.ipynb.

Tap from fixerio and target to multiple gsheets.

  1. iterator-gsheet.ipynb.

use Python object as a Tap and target to gsheet.

  1. fixerio-writefile.ipynb.

Tap from fixerio and save to file using Python object as a Target.

  1. fixerio-gsheet-writefile-bq.ipynb

Tap from fixerio and save to gsheet, save to file using Python object as a Target and save to bigquery.

logo

  1. iterator-transformation-gsheet.ipynb

use Python object as a Tap, transform realtime and target to gsheet.

  1. iterator-filter-gsheet.ipynb

use Python object as a Tap, filter realtime and target to gsheet.

  1. postgres-bq.ipynb

use dynamic_singer.extra.postgres.Tap to pull data from postgres and dump to bigquery.

logo

  1. postgres-bq-transformation.ipynb

use dynamic_singer.extra.postgres.Tap to pull data from postgres, do transformation and dump to bigquery.

Usage

dynamic_singer.Source

class Source:
    def __init__(
        self,
        tap,
        tap_schema: Dict = None,
        tap_name: str = None,
        tap_key: str = None,
        port: int = 8000,
    ):
        """
        Parameters
        ----------
        tap: str / object
            tap source.
        tap_schema: Dict, (default=None)
            data schema if tap an object. If `tap_schema` is None, it will auto generate schema.
        tap_name: str, (default=None)
            name for tap, necessary if tap is an object. it will throw an error if not a string if tap is an object.
        tap_key: str, (default=None)
            important non-duplicate key from `tap.emit()`, usually a timestamp.
        port: int, (default=8000)
            prometheus exporter port.
        """

dynamic_singer.Source.add

def add(self, target):
    """
    Parameters
    ----------
    target: str / object
        target source.
    """

dynamic_singer.Source.get_targets

def get_targets(self):
    """
    Returns
    ----------
    result: list of targets
    """

dynamic_singer.Source.delete_target

def delete_target(self, index: int):
    """
    Parameters
    ----------
    index: int
        target index from `get_targets()`.
    """

dynamic_singer.Source.start

def start(
    self,
    transformation: Callable = None,
    asynchronous: bool = False,
    debug: bool = True,
    ignore_null: bool = True,
    graceful_shutdown: int = 30,
    post_function: Callable = None,
):
    """
    Parameters
    ----------
    transformation: Callable, (default=None)
        a callable variable to transform tap data, this will auto generate new data schema.
    asynchronous: bool, (default=False)
        If True, emit to targets in async manner, else, loop from first target until last target.
    debug: bool, (default=True)
        If True, will print every rows emitted and parsed.
    ignore_null: bool, (default=True)
        If False, if one of schema value is Null, it will throw an exception.
    graceful_shutdown: int, (default=30)
        If bigger than 0, any error happened, will automatically shutdown after sleep.
    post_function: Callable, (default=None)
        If callable, it will pass metadata to the function.
    """

graceful_shutdown is useful if we deployed in an environment that able to auto restart like Kubernetes.

Extra

Postgres

bigquery_schema

def bigquery_schema(schema: str, table: str, connection):
    """
    Generate bigquery schema.

    Parameters
    ----------
    schema: str
        postgres schema.
    table: str
        table name.
    connection: object
        psycopg2 connection object.

    Returns
    -------
    result : dict
    """

Full example, check example/postgres-bq.ipynb or example/postgres-bq-transformation.ipynb.

Tap

class Tap:
    def __init__(
        self,
        schema: str,
        table: str,
        primary_key: str,
        connection,
        persistent,
        batch_size: int = 100,
        rest_time: int = 10,
        filter: str = '',
    ):

        """
        Postgres Tap using query statement.

        Parameters
        ----------
        schema: str
            postgres schema.
        table: str
            table name.
        primary_key: str
            column acted as primary key.
        connection: object
            psycopg2 connection object.
        persistent: object
            a python object that must has `pull` and `push` method to persist primary_key state.
        batch_size: int, (default=100)
            size of rows for each pull from postgres.
        rest_time: int, (default=10)
            rest for rest_time seconds after done pulled.
        filter: str, (default='')
            sql where statement for additional filter. Example, 'price > 0 and discount > 10', depends on table definition.

        """

Full example, check example/postgres-bq.ipynb or example/postgres-bq-transformation.ipynb.

Persistent

BQ_GCS

class BQ_GCS:
    def __init__(
        self,
        bq_client,
        bucket,
        project: str,
        schema: str,
        table: str,
        primary_key: str,
        prefix: str = 'singer_record',
    ):

        """
        Persistency layer for BQ combined with GCS.

        Parameters
        ----------
        bq_client: object
            initiated from `from google.cloud import bigquery`.
        bucket: object
            initiated from `from google.cloud import storage`.
        project: str
            project id.
        schema: str
            BQ schema.
        table: str
            table name.
        primary_key: str
            column acted as primary key.
        prefix: str
            prefix path for GCS.
        """

GCS

class GCS:
    def __init__(
        self,
        bucket,
        project: str,
        schema: str,
        table: str,
        primary_key: str,
        prefix: str = 'singer_record',
    ):

        """
        Persistency layer using GCS.

        Parameters
        ----------
        bucket: object
            initiated from `from google.cloud import storage`.
        schema: str
            BQ schema.
        table: str
            table name.
        primary_key: str
            column acted as primary key.
        prefix: str
            prefix path for GCS.
        """

About

Python API, Dynamic source, Dynamic target, N targets, Prometheus exporter, realtime transformation for Singer ETL

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published