Skip to content

Commit

Permalink
Merge branch 'main' into insert-soft-delete-same-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 1, 2023
2 parents 4a4cae6 + 864e502 commit 6880dfa
Show file tree
Hide file tree
Showing 18 changed files with 360 additions and 326 deletions.
4 changes: 4 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}
Expand All @@ -163,13 +164,15 @@ func (h *FlowRequestHandler) CreateCDCFlow(
if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
log.Errorf("unable to create flow job entry: %v", err)
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
if err != nil {
log.Errorf("unable to update flow config in catalog: %v", err)
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

Expand All @@ -183,6 +186,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
state, // workflow state
)
if err != nil {
log.Errorf("unable to start PeerFlow workflow: %v", err)
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h *FlowRequestHandler) GetSchemas(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT schema_name"+
" FROM information_schema.schemata;")
" FROM information_schema.schemata WHERE schema_name !~ '^pg_' AND schema_name <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (h *FlowRequestHandler) GetAllTables(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+
"FROM information_schema.tables;")
"FROM information_schema.tables WHERE table_schema !~ '^pg_' AND table_schema <> 'information_schema'")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,6 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
Expand All @@ -229,6 +220,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
batch := req.Records
var numRecords uint32

shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- true
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
Expand Down
73 changes: 58 additions & 15 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream(
clientXLogPos pglogrepl.LSN,
records *model.CDCRecordStream,
) error {
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

defer func() {
err := conn.Close(p.ctx)
if err != nil {
Expand Down Expand Up @@ -211,19 +208,26 @@ func (p *PostgresCDCSource) consumeStream(
}()

tablePKeyLastSeen := make(map[model.TableWithPkey]int)
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecord := func(rec model.Record) {
records.AddRecord(rec)
localRecords = append(localRecords, rec)

if len(localRecords) == 1 {
records.SignalAsNotEmpty()
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
log.Infof("num records accumulated: %d", len(localRecords))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

pkmRequiresResponse := false
waitingForCommit := false

for {
if time.Now().After(nextStandbyMessageDeadline) ||
(len(localRecords) >= int(req.MaxBatchSize)) {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
Expand All @@ -232,26 +236,64 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
standByLastLogged = time.Now()
}

nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
pkmRequiresResponse = false
}

if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) {
return nil
if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}

if waitingForCommit && !p.commitLock {
log.Infof(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
)
return nil
}

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if len(localRecords) > 0 {
log.Infof("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
len(localRecords),
)

if !p.commitLock {
// immediate return if we are not waiting for a commit
return nil
}

waitingForCommit = true
} else {
log.Infof("[%s] standby deadline reached, no records accumulated, continuing to wait",
req.FlowJobName,
)
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}

var ctx context.Context
var cancel context.CancelFunc

if len(localRecords) == 0 {
ctx, cancel = context.WithCancel(p.ctx)
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords))
log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords))
return nil
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
Expand Down Expand Up @@ -281,9 +323,10 @@ func (p *PostgresCDCSource) consumeStream(
if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}
if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
}

// always reply to keepalive messages
// instead of `pkm.ReplyRequested`
pkmRequiresResponse = true

case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
Expand Down
60 changes: 30 additions & 30 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {

// TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores.
limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil)
Expand Down Expand Up @@ -156,8 +156,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -201,8 +201,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -249,8 +249,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -318,8 +318,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -387,8 +387,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 0,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -449,8 +449,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 100,
ExitAfterRecords: 11,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -523,8 +523,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 6,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -592,8 +592,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -661,8 +661,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {

limits := peerflow.CDCFlowLimits{

TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -737,8 +737,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 2,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -799,8 +799,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 10,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -903,8 +903,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -978,8 +978,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 20,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -1056,8 +1056,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down
Loading

0 comments on commit 6880dfa

Please sign in to comment.