Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Aug 30, 2023
2 parents 4f28556 + f52b4dc commit bde9fb6
Show file tree
Hide file tree
Showing 158 changed files with 20,564 additions and 3,164 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,13 @@ jobs:
push: ${{ github.ref == 'refs/heads/main' }}
tags: |
ghcr.io/peerdb-io/flow-worker:dev-${{ steps.vars.outputs.sha_short }}
- name: Build (optionally publish) Flow Snapshot Worker Dev Image
uses: depot/build-push-action@v1
with:
token: ${{ secrets.DEPOT_TOKEN }}
context: .
file: stacks/flow-snapshot-worker.Dockerfile
push: ${{ github.ref == 'refs/heads/main' }}
tags: |
ghcr.io/peerdb-io/flow-snapshot-worker:dev-${{ steps.vars.outputs.sha_short }}
9 changes: 9 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ jobs:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: create hstore extension
run: |
sudo apt-get update
sudo apt-get install -y postgresql-client
psql -h localhost -p 7132 -U postgres -c "CREATE EXTENSION hstore;"
working-directory: ./flow
env:
PGPASSWORD: postgres

- name: run tests
run: |
gotestsum --format testname -- -p 1 ./... -timeout 1200s
Expand Down
12 changes: 11 additions & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Stable Docker images
on:
push:
tags:
- 'v[0-9]+.[0-9]+.[0-9]+'
- "v[0-9]+.[0-9]+.[0-9]+"

jobs:
docker-build:
Expand Down Expand Up @@ -56,3 +56,13 @@ jobs:
push: true
tags: |
ghcr.io/peerdb-io/flow-worker:${{ github.ref_name }}
- name: Publish Flow Snapshot Worker Stable Image
uses: depot/build-push-action@v1
with:
token: ${{ secrets.DEPOT_TOKEN }}
context: .
file: stacks/flow-snapshot-worker.Dockerfile
push: true
tags: |
ghcr.io/peerdb-io/flow-snapshot-worker:${{ github.ref_name }}
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
.vscode
.env

tmp/
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "nexus/sqlparser-rs"]
path = nexus/sqlparser-rs
url = git@github.com:PeerDB-io/sqlparser-rs.git
url = https://github.com/PeerDB-io/sqlparser-rs.git
81 changes: 30 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@

<div align="center">

<img class="img-fluid" src="images/banner.jpg" alt="PeerDB Banner" width="512" />
<img src="images/banner.jpg" alt="PeerDB Banner" width="512" />

#### Modern ETL in minutes, with SQL
#### Frustratingly simple ETL for Postgres

[![Workflow Status](https://github.com/PEerDB-io/peerdb/actions/workflows/ci.yml/badge.svg)](https://github.com/Peerdb-io/peerdb/actions/workflows/ci.yml) [![ElV2 License](https://badgen.net/badge/License/Elv2/green?icon=github)](https://github.com/PeerDB-io/peerdb/blob/main/LICENSE.md) [![Slack Community](https://img.shields.io/badge/slack-peerdb-brightgreen.svg?logo=slack)](https://join.slack.com/t/peerdb-public/shared_invite/zt-1wo9jydev-EXInbMtCtpAKFFWdi7QvLQ)
[![Workflow Status](https://github.com/PeerDB-io/peerdb/actions/workflows/ci.yml/badge.svg)](https://github.com/Peerdb-io/peerdb/actions/workflows/ci.yml)
[![ElV2 License](https://badgen.net/badge/License/Elv2/green?icon=github)](https://github.com/PeerDB-io/peerdb/blob/main/LICENSE.md)
[![Slack Community](https://img.shields.io/badge/slack-peerdb-brightgreen.svg?logo=slack)](https://join.slack.com/t/peerdb-public/shared_invite/zt-1wo9jydev-EXInbMtCtpAKFFWdi7QvLQ)

</div>

## PeerDB

PeerDB is a Postgres-compatible SQL interface to seamlessly integrate multiple data-stores. It enables you to **sync**, **transform** and **query** data across your stores using simple SQL commands. PeerDB takes a datastore native approach in engineering — enabling 10x faster and a highly reliable ETL experience for you.

We are starting with Postgres, Snowflake and BigQuery as the supported data-stores and plan to expand to others based on user-feedback.
PeerDB is a Postgres-first data-movement platform that makes moving data in and out of Postgres fast and simple. It enables you to **sync**, **transform** and **query** data across your stores using simple SQL commands. We implement multiple Postgres native and infrastructural optimizations for 10x faster data-movement in and out of PostgreSQL.

You can use PeerDB for any of the below use-cases:

1. Real-time sync (CDC) across stores.
2. Customized ETL across data-stores using SQL
1. Real-time Change Data Capture from PostgreSQL.
2. Real-time Streaming of Query results across data-stores
3. Federated query workloads - Query multiple data-stores through a common SQL interface

## Get started
Expand All @@ -35,71 +35,50 @@ docker compose up
psql "port=9900 host=localhost password=peerdb"
```

<img class="img-fluid" src="images/peerdb_getstarted.jpg" alt="img-verification" width="450" height="272">
<img src="images/peerdb-demo.gif" width="512" />

1. More details on adding PEERs available [here](https://docs.peerdb.io/sql/commands/create-peer)
2. More details on creating MIRRORs available [here](https://docs.peerdb.io/sql/commands/create-mirror)
3. Detailed documentation available [here](https://docs.peerdb.io).
Follow this 5-minute [Quickstart Guide](https://docs.peerdb.io/quickstart#quickstart) to see PeerDB in action i.e. streaming data in real-time across stores.

## Why PeerDB

Existing ETL tools primarily focus on supporting a wide range of data-stores. However, they fall short in providing a rich experience for any two specific data-stores. This becomes evident when your workloads need scale or have demanding feature requirements. It is common for such users to try out these tools and fail – tools not meeting their performance and reliability SLAs or lacking the required features. Such users resort to developing their own in-house solutions, investing a lot of time and resources.
Current data tools prioritize a wide range of connectors, often neglecting to optimize for Postgres users. This can be problematic for those storing large amounts of data in Postgres and frequently transferring it. As a result, many resort to building custom pipelines when existing tools don't meet their needs. We've developed this project to provide a straightforward and reliable solution specifically for Postgres.

### Postgres-first Approach

PeerDB is an ETL/ELT tool built for PostgreSQL. We implement multiple Postgres native and infrastructural optimizations to provide a fast, reliable and a feature-rich experience for moving data in/out of PostgreSQL.

**For performance** - we can parallelize initial load for a large table, still ensuring consistency. Syncing 100s of GB reduces from days to minutes. Our architecture is designed for real-time syncs and implements multiple logical replication related optimizations (tuning Postgres configs, parallel reading of slot etc.). This enables 10x faster Change Data Capture with data-freshness of a few 10s of seconds even at large throughputs (10k+ tps).

### Data-store nativity at it’s core, enabling scalable ETL
**For reliability**, we have mechanisms in place for fault tolerance - state management, automatic retries, handling idempotency and consistency and so on (<https://blog.peerdb.io/using-temporal-to-scale-data-synchronization-at-peerdb>) Configurable batching and parallelism prevent out of memory (OOMs) and crashes.

PeerDB takes a data-store first approach to ETL. It supports a set of highly adopted stores, implements multiple infrastructural and data-store native optimizations, providing a highly scalable and a feature-rich ETL experience. For example, in a sync from Postgres to BigQuery or Snowflake, PeerDB is 10 times faster than other tools. We are database experts and believe that an ETL tool should be datastore centric, than a hodge-podge of too many connectors.
**From a feature richness standpoint**, we support efficient syncing of tables with large (TOAST) columns. We support multiple streaming modes - Log based (CDC) based, Query based streaming etc. We provide rich data-type mapping and plan to support every possible (incl. Custom types) that Postgres supports to the best extent possible on the target data-store.

#### **Postgres-compatible SQL interface to do ETL**

The Postgres-compatible SQL interface for ETL is unique to PeerDB and enables you to operate in a language you are familiar with. You can do ETL the same way you work with your databases.

You can use Postgres’ eco-system to manage your ETL —

1. Client tools like pgadmin, psql to run SQL commands.
2. BI tools like grafana, tableau to visually monitor syncs and transforms.
1. Client tools like pgAdmin, psql to run SQL commands.
2. BI tools like Grafana, Tableau to visually monitor syncs and transforms.
3. Database migration and versioning tools like Flyway to manage your ETL.
4. Any language (Python, Go, Node.JS etc) and Scheduler (AirFlow) for development.
4. Any language (Python, Go, Node.js etc) and Scheduler (AirFlow) for development.
5. And many more

## Status

Currently PeerDB is in development phase. We have not launched yet. Below tables captures different features and their state

### PeerDB Query

Query supported data-stores with a Postgres-compatible SQL interface

| Data-store | Support | Status |
| --- | --- | --- |
| BigQuery | SELECT commands | STABLE |
| Snowflake | SELECT commands | Beta |
| PostgreSQL | DML + SELECT commands | Beta |
We support multiple target connectors to move data from Postgres and a couple of source connectors to move data into Postgres. Check the status of connectors [here](https://docs.peerdb.io/sql/commands/supported-connectors)

### PeerDB MIRROR
#### Metrics for MIRROR

Sync and transform data-from one store to another using CREATE MIRROR SQL command.
Both types of MIRRORs export some crucial metrics with regards to the health of the MIRROR. By default, our development Docker stack does not capture or visualize these metrics. They are available in a Docker Compose profile called `metrics`, which can be enabled by:

#### MIRROR for Streaming Changes (CDC)

Real-time syncing of data from source to target based on change-feed or CDC (logical decoding in the Postgres world)

| Feature | Source | Target | Status |
| --- | --- | --- | --- |
| CDC | PostgreSQL | BigQuery | Beta |
| CDC | PostgreSQL | Snowflake | Beta |
| CDC | PostgreSQL | Kafka | Beta |
| Initial Load | PostgreSQL | BigQuery | Coming Soon! |
| Initial Load | PostgreSQL | Snowflake | Coming Soon! |

#### MIRROR for SELECTs

Continuous syncing of data from source to target based on any SELECT query on the source. So this is basically a pre-transform - i.e. transform data on the source before syncing it to the target.
```bash
# add --profile metrics like this in front of any docker compose command being used.
docker compose --profile metrics up --build
```

| Source | Target | Status |
| --- | --- | --- |
| PostgreSQL | BigQuery | Beta |
| PostgreSQL | Snowflake | Beta |
| PostgreSQL | S3 | Under development |
This sets up both a Prometheus instance on port 9090 that scrapes the metrics from the flow workers, and also a Grafana instance on port 3000 that reads and visualizes the metrics from mirrors in a preconfigured dashboard. To view the dashboard, access the Grafana instance on `localhost:3000` with the user `admin` and the password `peerdb`.

## License

Expand Down
24 changes: 24 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: v1
managed:
enabled: true
go_package_prefix:
default: generated/protos
plugins:
- plugin: buf.build/protocolbuffers/go:v1.31.0
out: flow/generated/protos
opt: paths=source_relative
- plugin: buf.build/grpc/go:v1.3.0
out: flow/generated/protos
opt:
- paths=source_relative
- plugin: buf.build/community/neoeinstein-prost:v0.2.3
out: nexus/pt/src
opt:
- compile_well_known_types
- extern_path=.google.protobuf=::pbjson_types
- plugin: buf.build/community/neoeinstein-tonic:v0.3.0
out: nexus/pt/src
- plugin: buf.build/community/neoeinstein-prost-serde:v0.2.3
out: nexus/pt/src
opt:
- ignore_unknown_fields=true
41 changes: 41 additions & 0 deletions dagster/dagster_peerdb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
*.pyc

# Packages
*.egg
!/tests/**/*.egg
/*.egg-info
/dist/*
build
_build
.cache
*.so

# Installer logs
pip-log.txt

# Unit test / coverage reports
.coverage
.pytest_cache

.DS_Store
.idea/*
.python-version
.vscode/*

__pycache__

/test.py
/test_*.*

/setup.cfg
MANIFEST.in
/setup.py
/docs/site/*
/tests/fixtures/simple_project/setup.py
/tests/fixtures/project_with_extras/setup.py
.mypy_cache

.venv
/releases/*
pip-wheel-metadata
/poetry.toml
Empty file.
3 changes: 3 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .resources import PeerDBResource
from .ops import peerdb_execute_mirror
from .types import PeerDBMirrorOutput
31 changes: 31 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dagster import Config, In, Nothing, Out, Output, op, get_dagster_logger
from pydantic import Field

from .resources import PeerDBResource
from .types import PeerDBMirrorOutput


class PeerDBMirrorConfig(Config):
mirror_name: str = Field(
...,
description="The name of the mirror job to execute."
"This job must already have been created by using the "
"`CREATE MIRROR` commad with `disabled = true`.",
)


@op(
ins={"start_after": In(Nothing)},
out=Out(
PeerDBMirrorOutput,
description=("The output of the peerdb mirror operation."),
),
)
def peerdb_execute_mirror(context, config: PeerDBMirrorConfig, peerdb: PeerDBResource):
log = get_dagster_logger()
workflow_id = peerdb.execute_mirror(config.mirror_name)
log.info(f"Executed PeerDB workflow: {workflow_id}")
output = PeerDBMirrorOutput(
workflow_id=workflow_id,
)
return Output(output)
35 changes: 35 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import psycopg
from dagster import ConfigurableResource, get_dagster_logger
from temporalio.client import Client


class PeerDBResource(ConfigurableResource):
peerdb_server_jdbc_url: str
temporal_host_port: str

def execute_mirror(
self,
mirror_name: str,
) -> str:
log = get_dagster_logger()
workflow_id = ""
with psycopg.connect(self.peerdb_server_jdbc_url) as conn:
with conn.cursor() as cur:
cur.execute(f"EXECUTE MIRROR {mirror_name}")
cur.execute(
f"SELECT workflow_id FROM flows WHERE name = '{mirror_name}'"
)
workflow_id = cur.fetchone()[0]
log.info(f"started PeerDB workflow: {workflow_id}")
asyncio.run(self.wait_for_workflow_completion(workflow_id))
return workflow_id

async def wait_for_workflow_completion(self, workflow_id: str) -> None:
# sleep for 2 seconds to give the workflow time to start
await asyncio.sleep(2)
log = get_dagster_logger()
client = await Client.connect(self.temporal_host_port, namespace="default")
log.info(f"waiting for PeerDB workflow: {workflow_id}")
handle = client.get_workflow_handle(workflow_id)
await handle.result()
12 changes: 12 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any, Mapping, NamedTuple, Optional


class PeerDBMirrorOutput(
NamedTuple(
"_PeerDBMirrorOutput",
[
("workflow_id", str),
],
)
):
pass
45 changes: 45 additions & 0 deletions dagster/dagster_peerdb/peerdb_demo_dagster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from dagster import Definitions, load_assets_from_modules, job

from dagster_peerdb import PeerDBResource, peerdb_execute_mirror
from dagster_dbt import dbt_cli_resource, dbt_run_op

from . import assets
from .assets import DBT_PROJECT_DIR

all_assets = load_assets_from_modules([assets])

peerdb_resource = PeerDBResource.configure_at_launch()

dbt_resource = dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROJECT_DIR,
},
)


simple_mirror_op = peerdb_execute_mirror.configured(
{
"mirror_name": "simple_mirror_from_src_to_dst",
},
name="simple_mirror",
)


events_in_usa_op = dbt_run_op.alias(name="events_in_usa_op")


@job
def simple_mirror_transform_job():
dbt_output = events_in_usa_op(start_after=[simple_mirror_op()])
# return dbt_output


defs = Definitions(
assets=all_assets,
jobs=[simple_mirror_transform_job],
resources={
"peerdb": peerdb_resource,
"dbt": dbt_resource,
},
)
Loading

0 comments on commit bde9fb6

Please sign in to comment.