Skip to content

Commit

Permalink
Merge branch 'main' into setup-dst-table-before-truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 1, 2023
2 parents a28b3a6 + 40117c5 commit be47af0
Show file tree
Hide file tree
Showing 31 changed files with 644 additions and 506 deletions.
5 changes: 4 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ services:
catalog:
condition: service_healthy
environment:
- DB=postgresql
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
Expand Down Expand Up @@ -86,11 +86,14 @@ services:
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
entrypoint: ["bash", "/etc/temporal/entrypoint.sh"]
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui:
container_name: temporal-ui
Expand Down
5 changes: 4 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ services:
catalog:
condition: service_healthy
environment:
- DB=postgresql
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
Expand All @@ -73,11 +73,14 @@ services:
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
entrypoint: ["bash", "/etc/temporal/entrypoint.sh"]
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui:
container_name: temporal-ui
Expand Down
13 changes: 13 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
}

maxBatchSize := int(cfg.MaxBatchSize)
Expand All @@ -139,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}
Expand All @@ -160,13 +164,15 @@ func (h *FlowRequestHandler) CreateCDCFlow(
if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
log.Errorf("unable to create flow job entry: %v", err)
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
if err != nil {
log.Errorf("unable to update flow config in catalog: %v", err)
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

Expand All @@ -180,6 +186,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
state, // workflow state
)
if err != nil {
log.Errorf("unable to start PeerFlow workflow: %v", err)
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

Expand Down Expand Up @@ -229,6 +236,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
}
if req.CreateCatalogEntry {
err := h.createQrepJobEntry(ctx, req, workflowID)
Expand Down Expand Up @@ -311,6 +321,9 @@ func (h *FlowRequestHandler) ShutdownFlow(
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: req.FlowJobName,
},
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h *FlowRequestHandler) GetSchemas(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT schema_name"+
" FROM information_schema.schemata;")
" FROM information_schema.schemata WHERE schema_name !~ '^pg_' AND schema_name <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (h *FlowRequestHandler) GetAllTables(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+
"FROM information_schema.tables;")
"FROM information_schema.tables WHERE table_schema !~ '^pg_' AND table_schema <> 'information_schema'")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,6 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
Expand All @@ -229,6 +220,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
batch := req.Records
var numRecords uint32

shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- true
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
Expand Down
73 changes: 58 additions & 15 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream(
clientXLogPos pglogrepl.LSN,
records *model.CDCRecordStream,
) error {
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

defer func() {
err := conn.Close(p.ctx)
if err != nil {
Expand Down Expand Up @@ -211,19 +208,26 @@ func (p *PostgresCDCSource) consumeStream(
}()

tablePKeyLastSeen := make(map[model.TableWithPkey]int)
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecord := func(rec model.Record) {
records.AddRecord(rec)
localRecords = append(localRecords, rec)

if len(localRecords) == 1 {
records.SignalAsNotEmpty()
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
log.Infof("num records accumulated: %d", len(localRecords))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

pkmRequiresResponse := false
waitingForCommit := false

for {
if time.Now().After(nextStandbyMessageDeadline) ||
(len(localRecords) >= int(req.MaxBatchSize)) {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
Expand All @@ -232,26 +236,64 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
standByLastLogged = time.Now()
}

nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
pkmRequiresResponse = false
}

if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) {
return nil
if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}

if waitingForCommit && !p.commitLock {
log.Infof(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
)
return nil
}

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if len(localRecords) > 0 {
log.Infof("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
len(localRecords),
)

if !p.commitLock {
// immediate return if we are not waiting for a commit
return nil
}

waitingForCommit = true
} else {
log.Infof("[%s] standby deadline reached, no records accumulated, continuing to wait",
req.FlowJobName,
)
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}

var ctx context.Context
var cancel context.CancelFunc

if len(localRecords) == 0 {
ctx, cancel = context.WithCancel(p.ctx)
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords))
log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords))
return nil
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
Expand Down Expand Up @@ -281,9 +323,10 @@ func (p *PostgresCDCSource) consumeStream(
if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}
if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
}

// always reply to keepalive messages
// instead of `pkm.ReplyRequested`
pkmRequiresResponse = true

case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
Expand Down
11 changes: 11 additions & 0 deletions flow/connectors/utils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,14 @@ func GetEnvInt(name string, defaultValue int) int {

return i
}

// GetEnvString returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set.
func GetEnvString(name string, defaultValue string) string {
val, ok := GetEnv(name)
if !ok {
return defaultValue
}

return val
}
Loading

0 comments on commit be47af0

Please sign in to comment.