Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Easy last ts #521

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 84 additions & 29 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewEventHubConnector(
return nil, err
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
Expand All @@ -62,14 +62,14 @@ func (c *EventHubConnector) Close() error {
var allErrors error

// close all the eventhub connections.
err := c.hubManager.Close()
if err != nil {
log.Errorf("failed to close eventhub connections: %v", err)
allErrors = errors.Join(allErrors, err)
}
// err := c.hubManager.Close()
// if err != nil {
// log.Errorf("failed to close eventhub connections: %v", err)
// allErrors = errors.Join(allErrors, err)
// }

// close the postgres metadata store.
err = c.pgMetadata.Close()
err := c.pgMetadata.Close()
if err != nil {
log.Errorf("failed to close postgres metadata store: %v", err)
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -129,15 +129,7 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()
func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) error {
tableNameRowsMapping := cmap.New[uint32]()
batch := req.Records
eventsPerHeartBeat := 1000
Expand All @@ -150,21 +142,22 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to convert record to json: %v", err)
return nil, err
return err
}

flushBatch := func() error {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
err := c.sendEventBatch(
ctx, batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -181,15 +174,15 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to get topic name: %v", err)
return nil, err
return err
}

err = batchPerTopic.AddEvent(topicName, json)
err = batchPerTopic.AddEvent(ctx, topicName, json)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event to batch: %v", err)
return nil, err
return err
}

if i%eventsPerHeartBeat == 0 {
Expand All @@ -199,24 +192,49 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if (i+1)%eventsPerBatch == 0 {
err := flushBatch()
if err != nil {
return nil, err
return err
}
}
}

// send the remaining events.
if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
err := c.sendEventBatch(
ctx, batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
return nil, err
return err
}
}
rowsSynced := len(batch.Records)
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("[total] successfully sent %d records to event hub", rowsSynced)

return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

startTime := time.Now()

// fire and forget the sync.
go func() {
err := c.syncRecordBatchAsync(req)
if err != nil {
log.Errorf("failed to sync record batch: %v", err)
}
}()

batch := req.Records
err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
Expand All @@ -228,18 +246,55 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

// get the latest timestamp from the records
var latestTimestamp time.Time
for _, record := range batch.Records {
tsColVal := record.GetItems().GetColumnValue("source_last_timestamp")
if tsColVal == nil {
continue
}

if tsColVal.Value != nil {
// see if its already a time.Time
if ts, ok := tsColVal.Value.(time.Time); ok {
if ts.After(latestTimestamp) {
latestTimestamp = ts
}
} else {
ts, err := time.Parse(time.RFC3339, tsColVal.Value.(string))
if err != nil {
log.Errorf("failed to parse timestamp: %v", err)
continue
}
if ts.After(latestTimestamp) {
latestTimestamp = ts
}
}
}
}

if !latestTimestamp.IsZero() {
err = c.pgMetadata.SetSourceLastTimestamp(req.FlowJobName, latestTimestamp)
if err != nil {
log.Errorf("failed to set source last timestamp: %v", err)
return nil, err
}
}

rowsSynced := len(batch.Records)
metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime))
metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced),
time.Since(startTime), int64(rowsSynced))
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
NumRecordsSynced: int64(len(batch.Records)),
TableNameRowsMapping: tableNameRowsMapping.Items(),
TableNameRowsMapping: make(map[string]uint32),
}, nil
}

func (c *EventHubConnector) sendEventBatch(
ctx context.Context,
events *HubBatches,
maxParallelism int64,
flowName string,
Expand Down Expand Up @@ -268,7 +323,7 @@ func (c *EventHubConnector) sendEventBatch(
}()

numEvents := eventBatch.NumEvents()
err := c.sendBatch(tblName, eventBatch)
err := c.sendBatch(ctx, tblName, eventBatch)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand Down Expand Up @@ -298,8 +353,8 @@ func (c *EventHubConnector) sendEventBatch(
return nil
}

func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error {
subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute)
func (c *EventHubConnector) sendBatch(ctx context.Context, tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
line is 126 characters (lll)

subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conneventhub

import (
"context"
"fmt"
"strings"

Expand All @@ -20,14 +21,14 @@ func NewHubBatches(manager *EventHubManager) *HubBatches {
}
}

func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error {
batches, ok := h.batches[name]
if !ok {
batches = []*azeventhubs.EventDataBatch{}
}

if len(batches) == 0 {
newBatch, err := h.manager.CreateEventDataBatch(name)
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return err
}
Expand All @@ -36,7 +37,7 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {

if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(name, event)
overflowBatch, err := h.handleBatchOverflow(ctx, name, event)
if err != nil {
return fmt.Errorf("failed to handle batch overflow: %v", err)
}
Expand All @@ -51,10 +52,11 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
}

func (h *HubBatches) handleBatchOverflow(
ctx context.Context,
name ScopedEventhub,
event string,
) (*azeventhubs.EventDataBatch, error) {
newBatch, err := h.manager.CreateEventDataBatch(name)
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return nil, err
}
Expand Down
39 changes: 18 additions & 21 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
)

type EventHubManager struct {
ctx context.Context
creds *azidentity.DefaultAzureCredential
// eventhub peer name -> config
peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig]
Expand All @@ -25,7 +24,6 @@ type EventHubManager struct {
}

func NewEventHubManager(
ctx context.Context,
creds *azidentity.DefaultAzureCredential,
groupConfig *protos.EventHubGroupConfig,
) *EventHubManager {
Expand All @@ -36,7 +34,6 @@ func NewEventHubManager(
}

return &EventHubManager{
ctx: ctx,
creds: creds,
peerConfig: peerConfig,
}
Expand Down Expand Up @@ -69,30 +66,30 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub
return hub.(*azeventhubs.ProducerClient), nil
}

func (m *EventHubManager) Close() error {
var globalErr error
m.hubs.Range(func(key, value interface{}) bool {
hub := value.(*azeventhubs.ProducerClient)
err := hub.Close(m.ctx)
if err != nil {
log.Errorf("failed to close eventhub client: %v", err)
globalErr = fmt.Errorf("failed to close eventhub client: %v", err)
return false
}
return true
})

return globalErr
}

func (m *EventHubManager) CreateEventDataBatch(name ScopedEventhub) (*azeventhubs.EventDataBatch, error) {
// func (m *EventHubManager) Close() error {
// var globalErr error
// m.hubs.Range(func(key, value interface{}) bool {
// hub := value.(*azeventhubs.ProducerClient)
// err := hub.Close(m.ctx)
// if err != nil {
// log.Errorf("failed to close eventhub client: %v", err)
// globalErr = fmt.Errorf("failed to close eventhub client: %v", err)
// return false
// }
// return true
// })

// return globalErr
// }

func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
line is 127 characters (lll)

hub, err := m.GetOrCreateHubClient(name)
if err != nil {
return nil, err
}

opts := &azeventhubs.EventDataBatchOptions{}
batch, err := hub.NewEventDataBatch(m.ctx, opts)
batch, err := hub.NewEventDataBatch(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to create event data batch: %v", err)
}
Expand Down
24 changes: 23 additions & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connmetadata

import (
"context"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -86,7 +87,8 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
sync_batch_id BIGINT NOT NULL
sync_batch_id BIGINT NOT NULL,
source_last_timestamp TIMESTAMP
)
`)
if err != nil {
Expand Down Expand Up @@ -217,6 +219,26 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error {
return nil
}

// set source_last_timestamp for a job
func (p *PostgresMetadataStore) SetSourceLastTimestamp(jobName string, timestamp time.Time) error {
log.WithFields(log.Fields{
"flowName": jobName,
}).Infof("setting source last timestamp for job `%s` to `%s`", jobName, timestamp)
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET source_last_timestamp=$2 WHERE job_name=$1
`, jobName, timestamp)

if err != nil {
log.WithFields(log.Fields{
"flowName": jobName,
}).Errorf("failed to set source last timestamp: %v", err)
return err
}

return nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+`
Expand Down
Loading