From 68bf3ce9d1c050a7f8232f6ea8e67542fcc2bf1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Dec 2023 16:58:29 +0000 Subject: [PATCH] Fix everything --- flow/activities/flowable.go | 10 +++-- flow/connectors/bigquery/bigquery.go | 23 +++++++---- flow/connectors/core.go | 5 ++- flow/connectors/eventhub/eventhub.go | 10 ++--- flow/connectors/external_metadata/store.go | 13 ++---- flow/connectors/postgres/cdc.go | 22 ++++------- flow/connectors/postgres/client.go | 2 +- flow/connectors/postgres/postgres.go | 29 ++++++++------ flow/connectors/s3/s3.go | 8 ++-- flow/connectors/snowflake/snowflake.go | 46 +++++++++++++++++----- flow/model/model.go | 8 ++-- flow/shared/constants.go | 7 ++-- flow/workflows/sync_flow.go | 10 ++--- 13 files changed, 115 insertions(+), 78 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 93ce97b38e..2e2616c207 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -81,10 +81,10 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot func (a *FlowableActivity) GetLastSyncedID( ctx context.Context, config *protos.GetLastSyncedIDInput, -) (*protos.LastSyncState, error) { +) (int64, error) { dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { - return nil, fmt.Errorf("failed to get connector: %w", err) + return 0, fmt.Errorf("failed to get connector: %w", err) } defer connectors.CloseConnector(dstConn) @@ -115,7 +115,6 @@ func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error) { - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -251,7 +250,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, FlowJobName: input.FlowConnectionConfigs.FlowJobName, SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, - LastSyncState: input.LastSyncState, + LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(), TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, @@ -259,6 +258,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, RelationMessageMapping: input.RelationMessageMapping, RecordStream: recordBatch, + SetLastOffset: func(lastOffset int64) error { + return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset) + }, }) }) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index eb6ac8d198..9a76281056 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -316,30 +316,39 @@ func (c *BigQueryConnector) SetupMetadataTables() error { } // GetLastOffset returns the last synced ID. -func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return nil, err + return 0, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found, returning nil") - return nil, nil + return 0, nil } if row[0] == nil { c.logger.Info("no offset found, returning nil") - return nil, nil + return 0, nil } else { - return &protos.LastSyncState{ - Checkpoint: row[0].(int64), - }, nil + return row[0].(int64), nil + } +} + +func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error { + query := fmt.Sprintf("UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, lastOffset, jobName) + q := c.client.Query(query) + _, err := q.Read(c.ctx) + if err != nil { + return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) } + + return nil } func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 1c9352f7a8..f45d0cd208 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -60,7 +60,10 @@ type CDCSyncConnector interface { SetupMetadataTables() error // GetLastOffset gets the last offset from the metadata table on the destination - GetLastOffset(jobName string) (*protos.LastSyncState, error) + GetLastOffset(jobName string) (int64, error) + + // SetLastOffset updates the last offset on the metadata table on the destination + SetLastOffset(jobName string, lastOffset int64) error // GetLastSyncBatchID gets the last batch synced to the destination from the metadata table GetLastSyncBatchID(jobName string) (int64, error) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 11679e1dab..f42d38a2de 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -110,16 +110,16 @@ func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { return syncBatchID, nil } -func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) { res, err := c.pgMetadata.FetchLastOffset(jobName) if err != nil { - return nil, err + return 0, err } return res, nil } -func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { +func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error { err := c.pgMetadata.UpdateLastOffset(jobName, offset) if err != nil { c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err)) @@ -197,7 +197,7 @@ func (c *EventHubConnector) processBatch( } if lastSeenLSN > lastUpdatedOffset { - err = c.updateLastOffset(flowJobName, lastSeenLSN) + err = c.SetLastOffset(flowJobName, lastSeenLSN) lastUpdatedOffset = lastSeenLSN c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN)) if err != nil { @@ -243,7 +243,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } - err = c.updateLastOffset(req.FlowJobName, lastCheckpoint) + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) if err != nil { c.logger.Error("failed to update last offset", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ddfd6b63b0..99d40ff1c1 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -136,7 +136,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { return nil } -func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) { +func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset FROM `+p.schemaName+`.`+lastSyncStateTableName+` @@ -145,22 +145,17 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn var offset pgtype.Int8 err := rows.Scan(&offset) if err != nil { - // if the job doesn't exist, return 0 if err.Error() == "no rows in result set" { - return &protos.LastSyncState{ - Checkpoint: 0, - }, nil + return 0, nil } p.logger.Error("failed to get last offset", slog.Any("error", err)) - return nil, err + return 0, err } p.logger.Info("got last offset for job", slog.Int64("offset", offset.Int64)) - return &protos.LastSyncState{ - Checkpoint: offset.Int64, - }, nil + return offset.Int64, nil } func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 0cf8a9c762..a7c86f19a8 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -24,12 +24,11 @@ import ( type PostgresCDCSource struct { ctx context.Context - catalogPool *pgxpool.Pool replPool *pgxpool.Pool SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude slot string - metadataSchema string + SetLastOffset func(int64) error publication string relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map @@ -43,14 +42,13 @@ type PostgresCDCSource struct { type PostgresCDCConfig struct { AppContext context.Context - CatalogPool *pgxpool.Pool Connection *pgxpool.Pool Slot string - MetadataSchema string Publication string SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude RelationMessageMapping model.RelationMessageMapping + SetLastOffset func(int64) error } // Create a new PostgresCDCSource @@ -63,12 +61,11 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string) return &PostgresCDCSource{ ctx: cdcConfig.AppContext, - catalogPool: cdcConfig.CatalogPool, replPool: cdcConfig.Connection, SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, TableNameMapping: cdcConfig.TableNameMapping, slot: cdcConfig.Slot, - metadataSchema: cdcConfig.MetadataSchema, + SetLastOffset: cdcConfig.SetLastOffset, publication: cdcConfig.Publication, relationMessageMapping: cdcConfig.RelationMessageMapping, typeMap: pgtype.NewMap(), @@ -148,9 +145,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { // start replication var clientXLogPos, startLSN pglogrepl.LSN - if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 { - p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint)) - clientXLogPos = pglogrepl.LSN(req.LastSyncState.Checkpoint) + if req.LastOffset > 0 { + p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) + clientXLogPos = pglogrepl.LSN(req.LastOffset) startLSN = clientXLogPos + 1 } @@ -184,12 +181,7 @@ func (p *PostgresCDCSource) consumeStream( if clientXLogPos > 0 { consumedXLogPos = clientXLogPos - _, err := p.catalogPool.Exec( - p.ctx, - fmt.Sprintf(updateMetadataForLsnSQL, p.metadataSchema, mirrorJobsTableIdentifier), - int64(consumedXLogPos), - req.FlowJobName, - ) + err := p.SetLastOffset(int64(consumedXLogPos)) if err != nil { return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 6edc0c7c76..0c14ad1035 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -32,6 +32,7 @@ const ( createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)" getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1" + setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2" getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1" getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1" createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" @@ -39,7 +40,6 @@ const ( insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)" checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1" updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3" - updateMetadataForLsnSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2" updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2" getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f124bf48d4..5cf9f0e76f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -168,31 +168,39 @@ func (c *PostgresConnector) SetupMetadataTables() error { } // GetLastOffset returns the last synced offset for a job. -func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.pool. Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) + return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) } defer rows.Close() if !rows.Next() { c.logger.Info("No row found, returning nil") - return nil, nil + return 0, nil } var result pgtype.Int8 err = rows.Scan(&result) if err != nil { - return nil, fmt.Errorf("error while reading result row: %w", err) + return 0, fmt.Errorf("error while reading result row: %w", err) } + if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") - return nil, nil + c.logger.Warn("Assuming zero offset means no sync has happened") } + return result.Int64, nil +} - return &protos.LastSyncState{ - Checkpoint: result.Int64, - }, nil +// SetLastOffset updates the last synced offset for a job. +func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error { + _, err := c.pool. + Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName, lastOffset) + if err != nil { + return fmt.Errorf("error getting last offset for job %s: %w", jobName, err) + } + + return nil } // PullRecords pulls records from the source. @@ -233,14 +241,13 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu cdc, err := NewPostgresCDCSource(&PostgresCDCConfig{ AppContext: c.ctx, - CatalogPool: catalogPool, Connection: c.replPool.Pool, SrcTableIDNameMapping: req.SrcTableIDNameMapping, Slot: slotName, - MetadataSchema: c.metadataSchema, Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, + SetLastOffset: req.SetLastOffset, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7dd102a305..7d6fbc93aa 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -175,17 +175,17 @@ func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { return syncBatchID, nil } -func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { res, err := c.pgMetadata.FetchLastOffset(jobName) if err != nil { - return nil, err + return 0, err } return res, nil } // update offset for a job -func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { +func (c *S3Connector) SetLastOffset(jobName string, offset int64) error { err := c.pgMetadata.UpdateLastOffset(jobName, offset) if err != nil { c.logger.Error("failed to update last offset: ", slog.Any("error", err)) @@ -227,7 +227,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } - err = c.updateLastOffset(req.FlowJobName, lastCheckpoint) + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) if err != nil { c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 1d7dcba312..a8367f0a21 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -73,6 +73,7 @@ const ( WHERE TABLE_SCHEMA=? and TABLE_NAME=?` checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" + setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" @@ -284,11 +285,11 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T return res, nil } -func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return nil, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) + return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) } defer func() { // not sure if the errors these two return are same or different? @@ -299,21 +300,48 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncStat }() if !rows.Next() { - c.logger.Warn("No row found ,returning nil") - return nil, nil + c.logger.Warn("No row found, returning 0") + return 0, nil } var result pgtype.Int8 err = rows.Scan(&result) if err != nil { - return nil, fmt.Errorf("error while reading result row: %w", err) + return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") - return nil, nil + return 0, nil } - return &protos.LastSyncState{ - Checkpoint: result.Int64, - }, nil + return result.Int64, nil +} + +func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error { + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(setLastOffsetSQL, + c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) + if err != nil { + return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) + } + defer func() { + // not sure if the errors these two return are same or different? + err = errors.Join(rows.Close(), rows.Err()) + if err != nil { + c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err)) + } + }() + + if !rows.Next() { + c.logger.Warn("No row found, returning 0") + return nil + } + var result pgtype.Int8 + err = rows.Scan(&result) + if err != nil { + return fmt.Errorf("error while reading result row: %w", err) + } + if result.Int64 == 0 { + c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") + } + return nil } func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { diff --git a/flow/model/model.go b/flow/model/model.go index 320ad47e61..487616c531 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -30,13 +30,13 @@ func NewNameAndExclude(name string, exclude []string) NameAndExclude { type PullRecordsRequest struct { // FlowJobName is the name of the flow job. FlowJobName string - // LastSyncedID is the last ID that was synced. - LastSyncState *protos.LastSyncState + // LastOffset is the latest LSN that was synced. + LastOffset int64 // MaxBatchSize is the max number of records to fetch. MaxBatchSize uint32 // IdleTimeout is the timeout to wait for new records. IdleTimeout time.Duration - //relId to name Mapping + // relId to name Mapping SrcTableIDNameMapping map[uint32]string // source to destination table name mapping TableNameMapping map[string]NameAndExclude @@ -50,6 +50,8 @@ type PullRecordsRequest struct { RelationMessageMapping RelationMessageMapping // record batch for pushing changes into RecordStream *CDCRecordStream + // last offset may be forwarded while processing records + SetLastOffset func(int64) error } type Record interface { diff --git a/flow/shared/constants.go b/flow/shared/constants.go index e49de60189..8379b6718f 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -23,10 +23,9 @@ const ( ShutdownSignal PauseSignal - CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor" - FlowNameKey ContextKey = "flowName" - PartitionIDKey ContextKey = "partitionId" - DeploymentUIDKey ContextKey = "deploymentUid" + FlowNameKey ContextKey = "flowName" + PartitionIDKey ContextKey = "partitionId" + DeploymentUIDKey ContextKey = "deploymentUid" ) type TaskQueueID int64 diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 0b207bc65f..471617a7f1 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -69,13 +69,13 @@ func (s *SyncFlowExecution) executeSyncFlow( } lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) - var dstSyncState *protos.LastSyncState - if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil { + var dstLastOffset int64 + if err := lastSyncFuture.Get(syncMetaCtx, &dstLastOffset); err != nil { return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err) } - if dstSyncState != nil { - msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstSyncState.Checkpoint) + if dstLastOffset != 0 { + msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstLastOffset) s.logger.Info(msg) } else { s.logger.Info("no last synced ID from destination peer") @@ -89,7 +89,7 @@ func (s *SyncFlowExecution) executeSyncFlow( // execute StartFlow on the peers to start the flow startFlowInput := &protos.StartFlowInput{ FlowConnectionConfigs: config, - LastSyncState: dstSyncState, + LastSyncState: &protos.LastSyncState{Checkpoint: dstLastOffset}, SyncFlowOptions: opts, RelationMessageMapping: relationMessageMapping, }