Skip to content

Commit

Permalink
data going in raw table
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 22, 2024
1 parent f5846c9 commit 063c63f
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 208 deletions.
75 changes: 37 additions & 38 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ x-flow-worker-env: &flow-worker-env
# For GCS, these will be your HMAC keys instead
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-AKIASB7EBZDCEVIMB4XH}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-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}
# For GCS, set this to "auto" without the quotes
AWS_REGION: ${AWS_REGION:-}
AWS_REGION: ${AWS_REGION:-us-east-2}
# For GCS, set this as: https://storage.googleapis.com
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables worker profiling using Grafana Pyroscope
Expand Down Expand Up @@ -178,41 +178,40 @@ services:
temporal-admin-tools:
condition: service_healthy

peerdb:
container_name: peerdb-server
stop_signal: SIGINT
build:
context: .
dockerfile: stacks/peerdb-server.Dockerfile
environment:
<<: *catalog-config
PEERDB_LOG_DIR: /var/log/peerdb
PEERDB_PASSWORD: peerdb
PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112
RUST_LOG: info
RUST_BACKTRACE: 1
ports:
- 9900:9900
depends_on:
catalog:
condition: service_healthy

peerdb-ui:
container_name: peerdb-ui
build:
context: .
dockerfile: stacks/peerdb-ui.Dockerfile
ports:
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
PEERDB_PASSWORD:
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
depends_on:
- flow-api
# peerdb:
# container_name: peerdb-server
# stop_signal: SIGINT
# build:
# context: .
# dockerfile: stacks/peerdb-server.Dockerfile
# environment:
# <<: *catalog-config
# PEERDB_LOG_DIR: /var/log/peerdb
# PEERDB_PASSWORD: peerdb
# PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112
# RUST_LOG: info
# RUST_BACKTRACE: 1
# ports:
# - 9900:9900
# depends_on:
# catalog:
# condition: service_healthy

# peerdb-ui:
# container_name: peerdb-ui
# build:
# context: .
# dockerfile: stacks/peerdb-ui.Dockerfile
# ports:
# - 3000:3000
# environment:
# <<: *catalog-config
# DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
# PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
# PEERDB_PASSWORD:
# NEXTAUTH_SECRET: __changeme__
# NEXTAUTH_URL: http://localhost:3000
# depends_on:
# - flow-api
volumes:
pgdata:
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (a *FlowableActivity) StartNormalize(
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName)
fmt.Printf("\n*********************** in StartNormalize %+v\n", conn)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down
2 changes: 2 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
fmt.Printf("\n******************************** CreateCDCFlow")
cfg := req.ConnectionConfigs
_, validateErr := h.ValidateCDCMirror(ctx, req)
if validateErr != nil {
Expand Down Expand Up @@ -230,6 +231,7 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
fmt.Printf("\n******************************** CreateQRepFlow")
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down
Loading

0 comments on commit 063c63f

Please sign in to comment.