Skip to content

Commit

Permalink
Merge branch 'main' into ui/validate-start-end-qrep
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Apr 30, 2024
2 parents c96b1bb + e3f6ef6 commit 904b617
Show file tree
Hide file tree
Showing 11 changed files with 623 additions and 127 deletions.
15 changes: 8 additions & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatch,
ConsumedOffset: &consumedOffset,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
SyncBatchID: syncBatchID,
Records: recordBatch,
ConsumedOffset: &consumedOffset,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
TableNameSchemaMapping: options.TableNameSchemaMapping,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
250 changes: 169 additions & 81 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,36 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/google/uuid"
"go.temporal.io/sdk/log"
"golang.org/x/exp/maps"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

const (
actionIndex = "index"
actionDelete = "delete"
)

type ElasticsearchConnector struct {
*metadataStore.PostgresMetadata
client *elasticsearch.Client
Expand Down Expand Up @@ -78,101 +88,178 @@ func (esc *ElasticsearchConnector) Close() error {
return nil
}

func (esc *ElasticsearchConnector) SetupQRepMetadataTables(ctx context.Context,
config *protos.QRepConfig,
// ES is queue-like, no raw table staging needed
func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context,
req *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil
}

// we handle schema changes by not handling them since no mapping is being enforced right now
func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *protos.QRepConfig,
partition *protos.QRepPartition, stream *model.QRecordStream,
) (int, error) {
startTime := time.Now()
func recordItemsProcessor(items model.RecordItems) ([]byte, error) {
qRecordJsonMap := make(map[string]any)

for key, val := range items.ColToVal {
if r, ok := val.(qvalue.QValueJSON); ok { // JSON is stored as a string, fix that
qRecordJsonMap[key] = json.RawMessage(
shared.UnsafeFastStringToReadOnlyBytes(r.Val))
} else {
qRecordJsonMap[key] = val.Value()
}
}

schema := stream.Schema()
return json.Marshal(qRecordJsonMap)
}

var bulkIndexFatalError error
var bulkIndexErrors []error
var bulkIndexMutex sync.Mutex
var docId string
numRecords := 0
bulkIndexerHasShutdown := false

// -1 means use UUID, >=0 means column in the record
upsertColIndex := -1
// only support single upsert column for now
if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_UPSERT &&
len(config.WriteMode.UpsertKeyColumns) == 1 {
for i, field := range schema.Fields {
if config.WriteMode.UpsertKeyColumns[0] == field.Name {
upsertColIndex = i
func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
// atomics for counts will be unnecessary in other destinations, using a mutex instead
var recordCountsUpdateMutex sync.Mutex
// we're taking a mutex anyway, avoid atomic
var lastSeenLSN atomic.Int64
var numRecords atomic.Int64

// no I don't like this either
esBulkIndexerCache := make(map[string]esutil.BulkIndexer)
bulkIndexersHaveShutdown := false
// true if we saw errors while closing
cacheCloser := func() bool {
closeHasErrors := false
if bulkIndexersHaveShutdown {
for _, esBulkIndexer := range maps.Values(esBulkIndexerCache) {
err := esBulkIndexer.Close(context.Background())
if err != nil {
esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err))
closeHasErrors = true
}
}
bulkIndexersHaveShutdown = true
}
return closeHasErrors
}
defer cacheCloser()

esBulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: config.DestinationTableIdentifier,
Client: esc.client,
// parallelism comes from the workflow design itself, no need for this
NumWorkers: 1,
FlushInterval: 10 * time.Second,
})
if err != nil {
esc.logger.Error("[es] failed to initialize bulk indexer", slog.Any("error", err))
return 0, fmt.Errorf("[es] failed to initialize bulk indexer: %w", err)
}
defer func() {
if !bulkIndexerHasShutdown {
err := esBulkIndexer.Close(context.Background())
if err != nil {
esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err))
flushLoopDone := make(chan struct{})
// we only update lastSeenLSN in the OnSuccess call, so this should be safe even if race
// between loop breaking and closing flushLoopDone
go func() {
ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-flushLoopDone:
return
case <-ticker.C:
lastSeen := lastSeenLSN.Load()
if lastSeen > req.ConsumedOffset.Load() {
if err := esc.SetLastOffset(ctx, req.FlowJobName, lastSeen); err != nil {
esc.logger.Warn("[es] SetLastOffset error", slog.Any("error", err))
} else {
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
esc.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen))
}
}
}
}
}()

for qRecord := range stream.Records {
qRecordJsonMap := make(map[string]any)
var docId string
var bulkIndexFatalError error
var bulkIndexErrors []error
var bulkIndexOnFailureMutex sync.Mutex

if upsertColIndex >= 0 {
docId = fmt.Sprintf("%v", qRecord[upsertColIndex].Value())
} else {
docId = uuid.New().String()
for record := range req.Records.GetRecords() {
var bodyBytes []byte
var err error
action := actionIndex

switch record.(type) {
case *model.InsertRecord[model.RecordItems], *model.UpdateRecord[model.RecordItems]:
bodyBytes, err = recordItemsProcessor(record.GetItems())
if err != nil {
esc.logger.Error("[es] failed to json.Marshal record", slog.Any("error", err))
return nil, fmt.Errorf("[es] failed to json.Marshal record: %w", err)
}
case *model.DeleteRecord[model.RecordItems]:
action = actionDelete
// no need to supply the document since we are deleting
bodyBytes = nil
}
for i, field := range schema.Fields {
switch r := qRecord[i].(type) {
// JSON is stored as a string, fix that
case qvalue.QValueJSON:
qRecordJsonMap[field.Name] = json.RawMessage(shared.
UnsafeFastStringToReadOnlyBytes(r.Val))
default:
qRecordJsonMap[field.Name] = r.Value()

bulkIndexer, ok := esBulkIndexerCache[record.GetDestinationTableName()]
if !ok {
bulkIndexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: record.GetDestinationTableName(),
Client: esc.client,
// can't really ascertain how many tables present to provide a reasonable value
NumWorkers: 1,
FlushInterval: 10 * time.Second,
})
if err != nil {
esc.logger.Error("[es] failed to initialize bulk indexer", slog.Any("error", err))
return nil, fmt.Errorf("[es] failed to initialize bulk indexer: %w", err)
}
esBulkIndexerCache[record.GetDestinationTableName()] = bulkIndexer
}
qRecordJsonBytes, err := json.Marshal(qRecordJsonMap)
if err != nil {
esc.logger.Error("[es] failed to json.Marshal record", slog.Any("error", err))
return 0, fmt.Errorf("[es] failed to json.Marshal record: %w", err)

if len(req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns) == 1 {
qValue, err := record.GetItems().GetValueByColName(
req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns[0])
if err != nil {
esc.logger.Error("[es] failed to process record", slog.Any("error", err))
return nil, fmt.Errorf("[es] failed to process record: %w", err)
}
docId = fmt.Sprint(qValue.Value())
} else {
tablePkey, err := model.RecToTablePKey(req.TableNameSchemaMapping, record)
if err != nil {
esc.logger.Error("[es] failed to process record", slog.Any("error", err))
return nil, fmt.Errorf("[es] failed to process record: %w", err)
}
docId = base64.RawURLEncoding.EncodeToString(tablePkey.PkeyColVal[:])
}

err = esBulkIndexer.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
err = bulkIndexer.Add(ctx, esutil.BulkIndexerItem{
Action: action,
DocumentID: docId,
Body: bytes.NewReader(qRecordJsonBytes),
Body: bytes.NewReader(bodyBytes),

OnSuccess: func(_ context.Context, _ esutil.BulkIndexerItem, _ esutil.BulkIndexerResponseItem) {
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
numRecords.Add(1)
recordCountsUpdateMutex.Lock()
defer recordCountsUpdateMutex.Unlock()
record.PopulateCountMap(tableNameRowsMapping)
},
// OnFailure is called for each failed operation, log and let parent handle
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem, err error,
) {
bulkIndexMutex.Lock()
defer bulkIndexMutex.Unlock()
// attempt to delete a record that wasn't present, possible from no initial load
if item.Action == actionDelete && res.Status == 404 {
return
}
bulkIndexOnFailureMutex.Lock()
defer bulkIndexOnFailureMutex.Unlock()
if err != nil {
bulkIndexErrors = append(bulkIndexErrors, err)
} else {
causeString := ""
if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" {
causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason)
}
cbErr := fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type,
cbErr := fmt.Errorf("id:%s action:%s type:%s reason:%s %s", item.DocumentID, item.Action, res.Error.Type,
res.Error.Reason, causeString)
bulkIndexErrors = append(bulkIndexErrors, cbErr)
if res.Error.Type == "illegal_argument_exception" {
Expand All @@ -183,36 +270,37 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
})
if err != nil {
esc.logger.Error("[es] failed to add record to bulk indexer", slog.Any("error", err))
return 0, fmt.Errorf("[es] failed to add record to bulk indexer: %w", err)
return nil, fmt.Errorf("[es] failed to add record to bulk indexer: %w", err)
}
if bulkIndexFatalError != nil {
esc.logger.Error("[es] fatal error while indexing record", slog.Any("error", bulkIndexFatalError))
return 0, fmt.Errorf("[es] fatal error while indexing record: %w", bulkIndexFatalError)
return nil, fmt.Errorf("[es] fatal error while indexing record: %w", bulkIndexFatalError)
}

// update here instead of OnSuccess, if we close successfully it should match
numRecords++
}
// "Receive on a closed channel yields the zero value after all elements in the channel are received."
close(flushLoopDone)

if err := stream.Err(); err != nil {
esc.logger.Error("[es] failed to get record from stream", slog.Any("error", err))
return 0, fmt.Errorf("[es] failed to get record from stream: %w", err)
if cacheCloser() {
esc.logger.Error("[es] failed to close bulk indexer(s)")
return nil, errors.New("[es] failed to close bulk indexer(s)")
}
if err := esBulkIndexer.Close(ctx); err != nil {
esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err))
return 0, fmt.Errorf("[es] failed to close bulk indexer: %w", err)
}
bulkIndexerHasShutdown = true
bulkIndexersHaveShutdown = true
if len(bulkIndexErrors) > 0 {
for _, err := range bulkIndexErrors {
esc.logger.Error("[es] failed to index record", slog.Any("err", err))
}
}

err = esc.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime)
if err != nil {
esc.logger.Error("[es] failed to log partition info", slog.Any("error", err))
return 0, fmt.Errorf("[es] failed to log partition info: %w", err)
lastCheckpoint := req.Records.GetLastCheckpoint()
if err := esc.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil {
return nil, err
}
return numRecords, nil

return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords.Load(),
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
Loading

0 comments on commit 904b617

Please sign in to comment.