From ee73106e30216f46ba563e3955bf0ecf2b3c9244 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 15 Oct 2023 11:57:20 -0400 Subject: [PATCH 1/4] abc --- flow/connectors/eventhub/eventhub.go | 35 ++++++++++++++++++++++ flow/connectors/external_metadata/store.go | 24 ++++++++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 51dba78b7d..7adac2b0c7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -228,6 +228,41 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } + // get the latest timestamp from the records + var latestTimestamp time.Time + for _, record := range batch.Records { + tsColVal := record.GetItems().GetColumnValue("source_last_timestamp") + if tsColVal == nil { + continue + } + + if tsColVal.Value != nil { + // see if its already a time.Time + if ts, ok := tsColVal.Value.(time.Time); ok { + if ts.After(latestTimestamp) { + latestTimestamp = ts + } + } else { + ts, err := time.Parse(time.RFC3339, tsColVal.Value.(string)) + if err != nil { + log.Errorf("failed to parse timestamp: %v", err) + continue + } + if ts.After(latestTimestamp) { + latestTimestamp = ts + } + } + } + } + + if !latestTimestamp.IsZero() { + err = c.pgMetadata.SetSourceLastTimestamp(req.FlowJobName, latestTimestamp) + if err != nil { + log.Errorf("failed to set source last timestamp: %v", err) + return nil, err + } + } + metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime)) metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime), int64(rowsSynced)) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 6ec7c07142..f6678a0d4f 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -2,6 +2,7 @@ package connmetadata import ( "context" + "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -86,7 +87,8 @@ func (p *PostgresMetadataStore) SetupMetadata() error { job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL + sync_batch_id BIGINT NOT NULL, + source_last_timestamp TIMESTAMP ) `) if err != nil { @@ -217,6 +219,26 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error { return nil } +// set source_last_timestamp for a job +func (p *PostgresMetadataStore) SetSourceLastTimestamp(jobName string, timestamp time.Time) error { + log.WithFields(log.Fields{ + "flowName": jobName, + }).Infof("setting source last timestamp for job `%s` to `%s`", jobName, timestamp) + _, err := p.pool.Exec(p.ctx, ` + UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` + SET source_last_timestamp=$2 WHERE job_name=$1 + `, jobName, timestamp) + + if err != nil { + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to set source last timestamp: %v", err) + return err + } + + return nil +} + func (p *PostgresMetadataStore) DropMetadata(jobName string) error { _, err := p.pool.Exec(p.ctx, ` DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+` From 13356df03a38ab904205c5f059cd4a9819a2660d Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 15 Oct 2023 14:08:06 -0400 Subject: [PATCH 2/4] fire async --- flow/connectors/eventhub/eventhub.go | 48 ++++++++++++++++++---------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7adac2b0c7..af47c01a85 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -129,15 +129,7 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error return nil } -func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { - return fmt.Sprintf("syncing records to eventhub with"+ - " push parallelism %d and push batch size %d", - req.PushParallelism, req.PushBatchSize) - }) - defer func() { - shutdown <- true - }() +func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) error { tableNameRowsMapping := cmap.New[uint32]() batch := req.Records eventsPerHeartBeat := 1000 @@ -153,14 +145,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - startTime := time.Now() for i, record := range batch.Records { json, err := record.GetItems().ToJSONWithOpts(toJSONOpts) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, }).Infof("failed to convert record to json: %v", err) - return nil, err + return err } flushBatch := func() error { @@ -181,7 +172,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S log.WithFields(log.Fields{ "flowName": req.FlowJobName, }).Infof("failed to get topic name: %v", err) - return nil, err + return err } err = batchPerTopic.AddEvent(topicName, json) @@ -189,7 +180,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S log.WithFields(log.Fields{ "flowName": req.FlowJobName, }).Infof("failed to add event to batch: %v", err) - return nil, err + return err } if i%eventsPerHeartBeat == 0 { @@ -199,7 +190,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if (i+1)%eventsPerBatch == 0 { err := flushBatch() if err != nil { - return nil, err + return err } } } @@ -209,7 +200,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S err := c.sendEventBatch(batchPerTopic, maxParallelism, req.FlowJobName, tableNameRowsMapping) if err != nil { - return nil, err + return err } } rowsSynced := len(batch.Records) @@ -217,6 +208,30 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S "flowName": req.FlowJobName, }).Infof("[total] successfully sent %d records to event hub", rowsSynced) + return nil +} + +func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { + return fmt.Sprintf("syncing records to eventhub with"+ + " push parallelism %d and push batch size %d", + req.PushParallelism, req.PushBatchSize) + }) + defer func() { + shutdown <- true + }() + + startTime := time.Now() + + // fire and forget the sync. + go func() { + err := c.syncRecordBatchAsync(req) + if err != nil { + log.Errorf("failed to sync record batch: %v", err) + } + }() + + batch := req.Records err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID) if err != nil { log.Errorf("failed to update last offset: %v", err) @@ -263,6 +278,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } } + rowsSynced := len(batch.Records) metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime)) metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime), int64(rowsSynced)) @@ -270,7 +286,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S FirstSyncedCheckPointID: batch.FirstCheckPointID, LastSyncedCheckPointID: batch.LastCheckPointID, NumRecordsSynced: int64(len(batch.Records)), - TableNameRowsMapping: tableNameRowsMapping.Items(), + TableNameRowsMapping: make(map[string]uint32), }, nil } From 7cd2525b84f74faed0715aeecb6463a7369ba14a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 15 Oct 2023 14:19:40 -0400 Subject: [PATCH 3/4] fix contexts --- flow/connectors/eventhub/eventhub.go | 16 +++++----- flow/connectors/eventhub/hub_batches.go | 10 ++++--- flow/connectors/eventhub/hubmanager.go | 39 ++++++++++++------------- 3 files changed, 32 insertions(+), 33 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index af47c01a85..48e1544f9f 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -40,7 +40,7 @@ func NewEventHubConnector( return nil, err } - hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) + hubManager := NewEventHubManager(defaultAzureCreds, config) metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101 pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), metadataSchemaName) @@ -62,14 +62,14 @@ func (c *EventHubConnector) Close() error { var allErrors error // close all the eventhub connections. - err := c.hubManager.Close() - if err != nil { - log.Errorf("failed to close eventhub connections: %v", err) - allErrors = errors.Join(allErrors, err) - } + // err := c.hubManager.Close() + // if err != nil { + // log.Errorf("failed to close eventhub connections: %v", err) + // allErrors = errors.Join(allErrors, err) + // } // close the postgres metadata store. - err = c.pgMetadata.Close() + err := c.pgMetadata.Close() if err != nil { log.Errorf("failed to close postgres metadata store: %v", err) allErrors = errors.Join(allErrors, err) @@ -175,7 +175,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) return err } - err = batchPerTopic.AddEvent(topicName, json) + err = batchPerTopic.AddEvent(context.Background(), topicName, json) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 97d1e568a4..652e10d45f 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -1,6 +1,7 @@ package conneventhub import ( + "context" "fmt" "strings" @@ -20,14 +21,14 @@ func NewHubBatches(manager *EventHubManager) *HubBatches { } } -func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { +func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error { batches, ok := h.batches[name] if !ok { batches = []*azeventhubs.EventDataBatch{} } if len(batches) == 0 { - newBatch, err := h.manager.CreateEventDataBatch(name) + newBatch, err := h.manager.CreateEventDataBatch(ctx, name) if err != nil { return err } @@ -36,7 +37,7 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil { if strings.Contains(err.Error(), "too large for the batch") { - overflowBatch, err := h.handleBatchOverflow(name, event) + overflowBatch, err := h.handleBatchOverflow(ctx, name, event) if err != nil { return fmt.Errorf("failed to handle batch overflow: %v", err) } @@ -51,10 +52,11 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { } func (h *HubBatches) handleBatchOverflow( + ctx context.Context, name ScopedEventhub, event string, ) (*azeventhubs.EventDataBatch, error) { - newBatch, err := h.manager.CreateEventDataBatch(name) + newBatch, err := h.manager.CreateEventDataBatch(ctx, name) if err != nil { return nil, err } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index be2681110d..1b5caafdd6 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -16,7 +16,6 @@ import ( ) type EventHubManager struct { - ctx context.Context creds *azidentity.DefaultAzureCredential // eventhub peer name -> config peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig] @@ -25,7 +24,6 @@ type EventHubManager struct { } func NewEventHubManager( - ctx context.Context, creds *azidentity.DefaultAzureCredential, groupConfig *protos.EventHubGroupConfig, ) *EventHubManager { @@ -36,7 +34,6 @@ func NewEventHubManager( } return &EventHubManager{ - ctx: ctx, creds: creds, peerConfig: peerConfig, } @@ -69,30 +66,30 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub return hub.(*azeventhubs.ProducerClient), nil } -func (m *EventHubManager) Close() error { - var globalErr error - m.hubs.Range(func(key, value interface{}) bool { - hub := value.(*azeventhubs.ProducerClient) - err := hub.Close(m.ctx) - if err != nil { - log.Errorf("failed to close eventhub client: %v", err) - globalErr = fmt.Errorf("failed to close eventhub client: %v", err) - return false - } - return true - }) - - return globalErr -} - -func (m *EventHubManager) CreateEventDataBatch(name ScopedEventhub) (*azeventhubs.EventDataBatch, error) { +// func (m *EventHubManager) Close() error { +// var globalErr error +// m.hubs.Range(func(key, value interface{}) bool { +// hub := value.(*azeventhubs.ProducerClient) +// err := hub.Close(m.ctx) +// if err != nil { +// log.Errorf("failed to close eventhub client: %v", err) +// globalErr = fmt.Errorf("failed to close eventhub client: %v", err) +// return false +// } +// return true +// }) + +// return globalErr +// } + +func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) { hub, err := m.GetOrCreateHubClient(name) if err != nil { return nil, err } opts := &azeventhubs.EventDataBatchOptions{} - batch, err := hub.NewEventDataBatch(m.ctx, opts) + batch, err := hub.NewEventDataBatch(ctx, opts) if err != nil { return nil, fmt.Errorf("failed to create event data batch: %v", err) } From 10fed2998f8ce441656ee25029a89093fe52851c Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 15 Oct 2023 14:25:11 -0400 Subject: [PATCH 4/4] fix ctx 2 --- flow/connectors/eventhub/eventhub.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 48e1544f9f..35dfe16def 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -142,6 +142,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) maxParallelism = 10 } + ctx := context.Background() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -155,7 +156,8 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) } flushBatch := func() error { - err := c.sendEventBatch(batchPerTopic, maxParallelism, + err := c.sendEventBatch( + ctx, batchPerTopic, maxParallelism, req.FlowJobName, tableNameRowsMapping) if err != nil { log.WithFields(log.Fields{ @@ -175,7 +177,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) return err } - err = batchPerTopic.AddEvent(context.Background(), topicName, json) + err = batchPerTopic.AddEvent(ctx, topicName, json) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, @@ -197,7 +199,8 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) // send the remaining events. if batchPerTopic.Len() > 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism, + err := c.sendEventBatch( + ctx, batchPerTopic, maxParallelism, req.FlowJobName, tableNameRowsMapping) if err != nil { return err @@ -291,6 +294,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } func (c *EventHubConnector) sendEventBatch( + ctx context.Context, events *HubBatches, maxParallelism int64, flowName string, @@ -319,7 +323,7 @@ func (c *EventHubConnector) sendEventBatch( }() numEvents := eventBatch.NumEvents() - err := c.sendBatch(tblName, eventBatch) + err := c.sendBatch(ctx, tblName, eventBatch) if err != nil { once.Do(func() { firstErr = err }) return @@ -349,8 +353,8 @@ func (c *EventHubConnector) sendEventBatch( return nil } -func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error { - subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute) +func (c *EventHubConnector) sendBatch(ctx context.Context, tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error { + subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() hub, err := c.hubManager.GetOrCreateHubClient(tblName)