Skip to content

Commit

Permalink
Merge branch 'main' into queue-initial-load-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 9, 2024
2 parents 4435f18 + e551e73 commit 935dd8c
Show file tree
Hide file tree
Showing 28 changed files with 168 additions and 200 deletions.
36 changes: 15 additions & 21 deletions dev-peerdb.sh
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
#!/bin/sh
if test -z "$USE_PODMAN"
set -Eeu

DOCKER="docker"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"

if test -n "${USE_PODMAN:=}"
then
if ! command -v docker &> /dev/null
then
if command -v podman-compose
then
echo "docker could not be found on PATH, using podman-compose"
# 0 is found, checking for not found so we check for podman then
if $(docker compose &>/dev/null) && [ $? -ne 0 ]; then
if $(podman compose &>/dev/null) && [ $? -eq 0 ]; then
echo "docker could not be found on PATH, using podman compose"
USE_PODMAN=1
else
echo "docker could not be found on PATH"
echo "docker compose could not be found on PATH"
exit 1
fi
fi
fi

if test -z "$USE_PODMAN"
then
DOCKER="docker compose"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"
else
DOCKER="podman-compose --podman-run-args=--replace"
EXTRA_ARGS=""
fi

# check if peerdb_network exists if not create it
if ! $DOCKER network inspect peerdb_network &> /dev/null
then
$DOCKER network create peerdb_network
if test -n "$USE_PODMAN"; then
DOCKER="podman"
EXTRA_ARGS="--podman-run-args=--replace"
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS
exec $DOCKER compose -f docker-compose-dev.yml up --build $EXTRA_ARGS
1 change: 0 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,5 +221,4 @@ volumes:

networks:
default:
external: true
name: peerdb_network
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,4 @@ volumes:

networks:
default:
external: true
name: peerdb_network
22 changes: 6 additions & 16 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,27 +142,19 @@ func (h *FlowRequestHandler) CreateCDCFlow(

if req.ConnectionConfigs.SoftDeleteColName == "" {
req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED"
} else {
// make them all uppercase
req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName)
}

if req.ConnectionConfigs.SyncedAtColName == "" {
req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName)
}

if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
slog.Error("unable to create flow job entry", slog.Any("error", err))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
slog.Error("unable to create flow job entry", slog.Any("error", err))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}

err := h.updateFlowConfigInCatalog(ctx, cfg)
err = h.updateFlowConfigInCatalog(ctx, cfg)
if err != nil {
slog.Error("unable to update flow config in catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
Expand Down Expand Up @@ -258,10 +250,8 @@ func (h *FlowRequestHandler) CreateQRepFlow(

if req.QrepConfig.SyncedAtColName == "" {
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
28 changes: 15 additions & 13 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func (h *FlowRequestHandler) ValidateCDCMirror(
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.ValidateCDCMirrorResponse, error) {
if req.CreateCatalogEntry && !req.ConnectionConfigs.Resync {
if !req.ConnectionConfigs.Resync {
mirrorExists, existCheckErr := h.CheckIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName)
if existCheckErr != nil {
slog.Error("/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr))
Expand Down Expand Up @@ -46,7 +46,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig()
if sourcePeerConfig == nil {
slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source))
return nil, errors.New("source peer config is nil")
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, errors.New("source peer config is nil")
}

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
Expand Down Expand Up @@ -103,17 +105,17 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
}

pubName := req.ConnectionConfigs.PublicationName
if pubName != "" {
err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName)
if err != nil {
displayErr := fmt.Errorf("provided source tables invalidated: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, displayErr
}

err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName)
if err != nil {
displayErr := fmt.Errorf("provided source tables invalidated: %v", err)
slog.Error(displayErr.Error())
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
fmt.Sprint(displayErr),
)
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, displayErr
}

return &protos.ValidateCDCMirrorResponse{
Expand Down
14 changes: 4 additions & 10 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,23 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
// atomics for counts will be unnecessary in other destinations, using a mutex instead
var recordCountsUpdateMutex sync.Mutex
// we're taking a mutex anyway, avoid atomic
var lastSeenLSN atomic.Int64
var numRecords atomic.Int64
var numRecords int64

// no I don't like this either
esBulkIndexerCache := make(map[string]esutil.BulkIndexer)
bulkIndexersHaveShutdown := false
// true if we saw errors while closing
cacheCloser := func() bool {
closeHasErrors := false
if bulkIndexersHaveShutdown {
if !bulkIndexersHaveShutdown {
for _, esBulkIndexer := range maps.Values(esBulkIndexerCache) {
err := esBulkIndexer.Close(context.Background())
if err != nil {
esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err))
closeHasErrors = true
}
numRecords += int64(esBulkIndexer.Stats().NumFlushed)
}
bulkIndexersHaveShutdown = true
}
Expand Down Expand Up @@ -237,9 +235,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,

OnSuccess: func(_ context.Context, _ esutil.BulkIndexerItem, _ esutil.BulkIndexerResponseItem) {
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
numRecords.Add(1)
recordCountsUpdateMutex.Lock()
defer recordCountsUpdateMutex.Unlock()
record.PopulateCountMap(tableNameRowsMapping)
},
// OnFailure is called for each failed operation, log and let parent handle
Expand Down Expand Up @@ -284,7 +279,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
esc.logger.Error("[es] failed to close bulk indexer(s)")
return nil, errors.New("[es] failed to close bulk indexer(s)")
}
bulkIndexersHaveShutdown = true
if len(bulkIndexErrors) > 0 {
for _, err := range bulkIndexErrors {
esc.logger.Error("[es] failed to index record", slog.Any("err", err))
Expand All @@ -299,7 +293,7 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords.Load(),
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
// create a separate connection pool for non-replication queries as replication connections cannot
// be used for extended query protocol, i.e. prepared statements
connConfig, err := pgx.ParseConfig(connectionString)
replConfig := connConfig.Copy()
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConfig := connConfig.Copy()
runtimeParams := connConfig.Config.RuntimeParams
runtimeParams["idle_in_transaction_session_timeout"] = "0"
runtimeParams["statement_timeout"] = "0"
Expand Down
33 changes: 18 additions & 15 deletions flow/connectors/postgres/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,31 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context,
}

tableStr := strings.Join(tableArr, ",")
// Check if publication exists
err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil)
if err != nil {
if err == pgx.ErrNoRows {
return fmt.Errorf("publication does not exist: %s", pubName)

if pubName != "" {
// Check if publication exists
err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil)
if err != nil {
if err == pgx.ErrNoRows {
return fmt.Errorf("publication does not exist: %s", pubName)
}
return fmt.Errorf("error while checking for publication existence: %w", err)
}
return fmt.Errorf("error while checking for publication existence: %w", err)
}

// Check if tables belong to publication
var pubTableCount int
err = c.conn.QueryRow(ctx, fmt.Sprintf(`
// Check if tables belong to publication
var pubTableCount int
err = c.conn.QueryRow(ctx, fmt.Sprintf(`
with source_table_components (sname, tname) as (values %s)
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount)
if err != nil {
return err
}
if err != nil {
return err
}

if pubTableCount != len(tableNames) {
return errors.New("not all tables belong to publication")
if pubTableCount != len(tableNames) {
return errors.New("not all tables belong to publication")
}
}

return nil
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa
}()

for destinationTableName, rowCounts := range tableNameRowsMapping {
numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount
inserts := rowCounts.InsertCount.Load()
updates := rowCounts.UpdateCount.Load()
deletes := rowCounts.DeleteCount.Load()
_, err = insertBatchTablesTx.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batch_table
(flow_name,batch_id,destination_table_name,num_rows,
insert_count,update_count,delete_count)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`,
flowJobName, batchID, destinationTableName, numRows,
rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount)
flowJobName, batchID, destinationTableName,
inserts+updates+deletes, inserts, updates, deletes)
if err != nil {
return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err)
}
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor
func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts {
tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps))
for _, mapping := range tableMaps {
tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{
InsertCount: 0,
UpdateCount: 0,
DeleteCount: 0,
}
tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{}
}

return tableNameRowsMapping
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,8 +1110,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
SoftDeleteColName: "_custom_deleted",
SyncedAtColName: "_custom_synced",
MaxBatchSize: 100,
}

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_softdel_iud", "id,c1,c2,t")
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool {
newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
"SELECT COUNT(*) FROM `%s.%s` WHERE _custom_deleted",
s.bqHelper.Config.DatasetId, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
e2e.EnvNoError(s.t, env, err)
Expand Down
8 changes: 5 additions & 3 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package model

import (
"sync/atomic"

"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type RecordTypeCounts struct {
InsertCount int
UpdateCount int
DeleteCount int
InsertCount atomic.Int32
UpdateCount atomic.Int32
DeleteCount atomic.Int32
}

type QRecordStream struct {
Expand Down
6 changes: 3 additions & 3 deletions flow/model/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *InsertRecord[T]) GetItems() T {
func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.InsertCount++
recordCount.InsertCount.Add(1)
}
}

Expand Down Expand Up @@ -91,7 +91,7 @@ func (r *UpdateRecord[T]) GetItems() T {
func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.UpdateCount++
recordCount.UpdateCount.Add(1)
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func (r *DeleteRecord[T]) GetItems() T {
func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.DeleteCount++
recordCount.DeleteCount.Add(1)
}
}

Expand Down
Loading

0 comments on commit 935dd8c

Please sign in to comment.