Skip to content

Commit

Permalink
Merge branch 'main' into composite-keys-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 17, 2023
2 parents 03f9d94 + ae666e0 commit 7ca5521
Show file tree
Hide file tree
Showing 33 changed files with 724 additions and 227 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
timeout-minutes: 30
services:
pg_cdc:
image: postgres:15.4-alpine
image: postgis/postgis:15-3.4-alpine
ports:
- 7132:5432
env:
Expand Down
12 changes: 10 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown <- true
}()

startTime := time.Now()
partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
Expand All @@ -430,7 +429,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
ctx,
config,
runUUID,
startTime,
partitions,
)
if err != nil {
Expand All @@ -449,6 +447,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
}

numPartitions := len(partitions.Partitions)
log.Infof("replicating partitions for job - %s - batch %d - size: %d\n",
config.FlowJobName, partitions.BatchId, numPartitions)
Expand All @@ -471,6 +474,11 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand Down
140 changes: 81 additions & 59 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)

type EventHubConnector struct {
Expand Down Expand Up @@ -61,15 +60,8 @@ func NewEventHubConnector(
func (c *EventHubConnector) Close() error {
var allErrors error

// close all the eventhub connections.
err := c.hubManager.Close()
if err != nil {
log.Errorf("failed to close eventhub connections: %v", err)
allErrors = errors.Join(allErrors, err)
}

// close the postgres metadata store.
err = c.pgMetadata.Close()
err := c.pgMetadata.Close()
if err != nil {
log.Errorf("failed to close postgres metadata store: %v", err)
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -129,46 +121,32 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()
tableNameRowsMapping := cmap.New[uint32]()
batch := req.Records
eventsPerHeartBeat := 1000
eventsPerBatch := int(req.PushBatchSize)
if eventsPerBatch <= 0 {
eventsPerBatch = 10000
}
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
}
func (c *EventHubConnector) processBatch(
flowJobName string,
batch *model.RecordBatch,
eventsPerBatch int,
maxParallelism int64,
) error {
ctx := context.Background()

tableNameRowsMapping := cmap.New[uint32]()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"flowName": flowJobName,
}).Infof("failed to convert record to json: %v", err)
return nil, err
return err
}

flushBatch := func() error {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
Expand All @@ -179,45 +157,84 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"flowName": flowJobName,
}).Infof("failed to get topic name: %v", err)
return nil, err
return err
}

err = batchPerTopic.AddEvent(topicName, json)
err = batchPerTopic.AddEvent(ctx, topicName, json)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return nil, err
}

if i%eventsPerHeartBeat == 0 {
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName.ToString()))
return err
}

if (i+1)%eventsPerBatch == 0 {
err := flushBatch()
if err != nil {
return nil, err
return err
}
}
}

// send the remaining events.
if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
return nil, err
return err
}
}

rowsSynced := len(batch.Records)
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", rowsSynced)
return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID)
eventsPerBatch := int(req.PushBatchSize)
if eventsPerBatch <= 0 {
eventsPerBatch = 10000
}
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
}

var err error
startTime := time.Now()

batch := req.Records

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) {
go func() {
err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism)
if err != nil {
log.Errorf("[async] failed to process batch: %v", err)
}
}()
} else {
err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism)
if err != nil {
log.Errorf("failed to process batch: %v", err)
return nil, err
}
}

err = c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
return nil, err
Expand All @@ -228,18 +245,19 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime))
metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced),
time.Since(startTime), int64(rowsSynced))
rowsSynced := int64(len(batch.Records))
metrics.LogSyncMetrics(c.ctx, req.FlowJobName, rowsSynced, time.Since(startTime))
metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, rowsSynced, time.Since(startTime), rowsSynced)
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
NumRecordsSynced: int64(len(batch.Records)),
TableNameRowsMapping: tableNameRowsMapping.Items(),
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
}, nil
}

func (c *EventHubConnector) sendEventBatch(
ctx context.Context,
events *HubBatches,
maxParallelism int64,
flowName string,
Expand Down Expand Up @@ -268,7 +286,7 @@ func (c *EventHubConnector) sendEventBatch(
}()

numEvents := eventBatch.NumEvents()
err := c.sendBatch(tblName, eventBatch)
err := c.sendBatch(ctx, tblName, eventBatch)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand Down Expand Up @@ -298,8 +316,12 @@ func (c *EventHubConnector) sendEventBatch(
return nil
}

func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error {
subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute)
func (c *EventHubConnector) sendBatch(
ctx context.Context,
tblName ScopedEventhub,
events *azeventhubs.EventDataBatch,
) error {
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conneventhub

import (
"context"
"fmt"
"strings"

Expand All @@ -20,14 +21,14 @@ func NewHubBatches(manager *EventHubManager) *HubBatches {
}
}

func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error {
batches, ok := h.batches[name]
if !ok {
batches = []*azeventhubs.EventDataBatch{}
}

if len(batches) == 0 {
newBatch, err := h.manager.CreateEventDataBatch(name)
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return err
}
Expand All @@ -36,7 +37,7 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {

if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(name, event)
overflowBatch, err := h.handleBatchOverflow(ctx, name, event)
if err != nil {
return fmt.Errorf("failed to handle batch overflow: %v", err)
}
Expand All @@ -51,10 +52,11 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
}

func (h *HubBatches) handleBatchOverflow(
ctx context.Context,
name ScopedEventhub,
event string,
) (*azeventhubs.EventDataBatch, error) {
newBatch, err := h.manager.CreateEventDataBatch(name)
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return nil, err
}
Expand Down
22 changes: 2 additions & 20 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
)

type EventHubManager struct {
ctx context.Context
creds *azidentity.DefaultAzureCredential
// eventhub peer name -> config
peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig]
Expand All @@ -36,7 +35,6 @@ func NewEventHubManager(
}

return &EventHubManager{
ctx: ctx,
creds: creds,
peerConfig: peerConfig,
}
Expand Down Expand Up @@ -69,30 +67,14 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub
return hub.(*azeventhubs.ProducerClient), nil
}

func (m *EventHubManager) Close() error {
var globalErr error
m.hubs.Range(func(key, value interface{}) bool {
hub := value.(*azeventhubs.ProducerClient)
err := hub.Close(m.ctx)
if err != nil {
log.Errorf("failed to close eventhub client: %v", err)
globalErr = fmt.Errorf("failed to close eventhub client: %v", err)
return false
}
return true
})

return globalErr
}

func (m *EventHubManager) CreateEventDataBatch(name ScopedEventhub) (*azeventhubs.EventDataBatch, error) {
func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHubClient(name)
if err != nil {
return nil, err
}

opts := &azeventhubs.EventDataBatchOptions{}
batch, err := hub.NewEventDataBatch(m.ctx, opts)
batch, err := hub.NewEventDataBatch(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to create event data batch: %v", err)
}
Expand Down
Loading

0 comments on commit 7ca5521

Please sign in to comment.