diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index af7fed1fd6..6dbb170740 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -56,7 +56,7 @@ services: catalog: condition: service_healthy environment: - - DB=postgresql + - DB=postgres12 - DB_PORT=5432 - POSTGRES_USER=postgres - POSTGRES_PWD=postgres @@ -86,11 +86,14 @@ services: image: temporalio/admin-tools:1.22 stdin_open: true tty: true + entrypoint: ["bash", "/etc/temporal/entrypoint.sh"] healthcheck: test: ["CMD", "tctl", "workflow", "list"] interval: 1s timeout: 5s retries: 30 + volumes: + - ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh temporal-ui: container_name: temporal-ui diff --git a/docker-compose.yml b/docker-compose.yml index a4f2205cec..440f06545d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,7 +49,7 @@ services: catalog: condition: service_healthy environment: - - DB=postgresql + - DB=postgres12 - DB_PORT=5432 - POSTGRES_USER=postgres - POSTGRES_PWD=postgres @@ -73,11 +73,14 @@ services: image: temporalio/admin-tools:1.22 stdin_open: true tty: true + entrypoint: ["bash", "/etc/temporal/entrypoint.sh"] healthcheck: test: ["CMD", "tctl", "workflow", "list"] interval: 1s timeout: 5s retries: 30 + volumes: + - ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh temporal-ui: container_name: temporal-ui diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 8cd4951787..99c25cbd28 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -129,6 +129,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + }, } maxBatchSize := int(cfg.MaxBatchSize) @@ -139,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( limits := &peerflow.CDCFlowLimits{ TotalSyncFlows: 0, + ExitAfterRecords: -1, TotalNormalizeFlows: 0, MaxBatchSize: maxBatchSize, } @@ -160,6 +164,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( if req.CreateCatalogEntry { err := h.createCdcJobEntry(ctx, req, workflowID) if err != nil { + log.Errorf("unable to create flow job entry: %v", err) return nil, fmt.Errorf("unable to create flow job entry: %w", err) } } @@ -167,6 +172,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( var err error err = h.updateFlowConfigInCatalog(cfg) if err != nil { + log.Errorf("unable to update flow config in catalog: %v", err) return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) } @@ -180,6 +186,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( state, // workflow state ) if err != nil { + log.Errorf("unable to start PeerFlow workflow: %v", err) return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) } @@ -229,6 +236,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + }, } if req.CreateCatalogEntry { err := h.createQrepJobEntry(ctx, req, workflowID) @@ -311,6 +321,9 @@ func (h *FlowRequestHandler) ShutdownFlow( workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + shared.MirrorNameSearchAttribute: req.FlowJobName, + }, } dropFlowHandle, err := h.temporalClient.ExecuteWorkflow( ctx, // context diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 14c8ba43cc..8aecff6229 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -44,7 +44,7 @@ func (h *FlowRequestHandler) GetSchemas( defer peerPool.Close() rows, err := peerPool.Query(ctx, "SELECT schema_name"+ - " FROM information_schema.schemata;") + " FROM information_schema.schemata WHERE schema_name !~ '^pg_' AND schema_name <> 'information_schema';") if err != nil { return &protos.PeerSchemasResponse{Schemas: nil}, err } @@ -106,7 +106,7 @@ func (h *FlowRequestHandler) GetAllTables( defer peerPool.Close() rows, err := peerPool.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+ - "FROM information_schema.tables;") + "FROM information_schema.tables WHERE table_schema !~ '^pg_' AND table_schema <> 'information_schema'") if err != nil { return &protos.AllTablesResponse{Tables: nil}, err } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 6c0f6b4ac9..73c8ed528c 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -211,15 +211,6 @@ func (c *EventHubConnector) processBatch( } func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { - return fmt.Sprintf("syncing records to eventhub with"+ - " push parallelism %d and push batch size %d", - req.PushParallelism, req.PushBatchSize) - }) - defer func() { - shutdown <- true - }() - maxParallelism := req.PushParallelism if maxParallelism <= 0 { maxParallelism = 10 @@ -229,6 +220,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S batch := req.Records var numRecords uint32 + shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { + return fmt.Sprintf( + "processed %d records for flow %s", + numRecords, req.FlowJobName, + ) + }) + defer func() { + shutdown <- true + }() + // if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true // we kick off processBatch in a goroutine and return immediately. // otherwise, we block until processBatch is done. diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 552f372a04..cfb0425515 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream( clientXLogPos pglogrepl.LSN, records *model.CDCRecordStream, ) error { - standbyMessageTimeout := req.IdleTimeout - nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - defer func() { err := conn.Close(p.ctx) if err != nil { @@ -211,6 +208,8 @@ func (p *PostgresCDCSource) consumeStream( }() tablePKeyLastSeen := make(map[model.TableWithPkey]int) + standbyMessageTimeout := req.IdleTimeout + nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) addRecord := func(rec model.Record) { records.AddRecord(rec) @@ -218,12 +217,17 @@ func (p *PostgresCDCSource) consumeStream( if len(localRecords) == 1 { records.SignalAsNotEmpty() + log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) + log.Infof("num records accumulated: %d", len(localRecords)) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } } + pkmRequiresResponse := false + waitingForCommit := false + for { - if time.Now().After(nextStandbyMessageDeadline) || - (len(localRecords) >= int(req.MaxBatchSize)) { + if pkmRequiresResponse { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, @@ -232,26 +236,64 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords)) - if time.Since(standByLastLogged) > 10*time.Second { + numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords)) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) standByLastLogged = time.Now() } - nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + pkmRequiresResponse = false + } - if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) { - return nil + if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock { + return nil + } + + if waitingForCommit && !p.commitLock { + log.Infof( + "[%s] commit received, returning currently accumulated records - %d", + req.FlowJobName, + len(localRecords), + ) + return nil + } + + // if we are past the next standby deadline (?) + if time.Now().After(nextStandbyMessageDeadline) { + if len(localRecords) > 0 { + log.Infof("[%s] standby deadline reached, have %d records, will return at next commit", + req.FlowJobName, + len(localRecords), + ) + + if !p.commitLock { + // immediate return if we are not waiting for a commit + return nil + } + + waitingForCommit = true + } else { + log.Infof("[%s] standby deadline reached, no records accumulated, continuing to wait", + req.FlowJobName, + ) } + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + } + + var ctx context.Context + var cancel context.CancelFunc + + if len(localRecords) == 0 { + ctx, cancel = context.WithCancel(p.ctx) + } else { + ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) } - ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) rawMsg, err := conn.ReceiveMessage(ctx) cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords)) + log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords)) return nil } else { return fmt.Errorf("ReceiveMessage failed: %w", err) @@ -281,9 +323,10 @@ func (p *PostgresCDCSource) consumeStream( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - if pkm.ReplyRequested { - nextStandbyMessageDeadline = time.Time{} - } + + // always reply to keepalive messages + // instead of `pkm.ReplyRequested` + pkmRequiresResponse = true case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go index 2911e3d8ef..6d3065dae4 100644 --- a/flow/connectors/utils/env.go +++ b/flow/connectors/utils/env.go @@ -45,3 +45,14 @@ func GetEnvInt(name string, defaultValue int) int { return i } + +// GetEnvString returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set. +func GetEnvString(name string, defaultValue string) string { + val, ok := GetEnv(name) + if !ok { + return defaultValue + } + + return val +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 1618e84326..ba8c2cb54c 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -111,8 +111,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) @@ -156,8 +156,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -201,8 +201,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -249,8 +249,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -318,8 +318,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -387,8 +387,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -449,8 +449,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -523,8 +523,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -592,8 +592,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -661,8 +661,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -737,8 +737,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -799,8 +799,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -903,8 +903,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -978,8 +978,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1056,8 +1056,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 45666f0369..3f5718bb72 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -45,8 +45,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -107,8 +107,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -271,8 +271,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -350,8 +350,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -431,8 +431,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index aaca2a125a..85993e2fe8 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -24,7 +24,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") - flowJobName := s.attachSuffix("test_simple_flow") + flowJobName := s.attachSuffix("test_simple_flow_s3") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, @@ -44,8 +44,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + TotalSyncFlows: 4, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { @@ -95,7 +96,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") - flowJobName := s.attachSuffix("test_simple_flow") + flowJobName := s.attachSuffix("test_simple_flow_gcs_interop") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, @@ -115,8 +116,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + TotalSyncFlows: 4, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { @@ -131,6 +133,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { `, srcTableName), testKey, testValue) s.NoError(err) } + fmt.Println("Inserted 20 rows into the source table") s.NoError(err) }() diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index e489545e3f..e5b7c588f9 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -149,15 +149,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 15 rows into the source table + // and then insert 20 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 15 rows into the source table + // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) @@ -166,7 +166,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, srcTableName), testKey, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + fmt.Println("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -225,15 +225,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table + // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) @@ -269,7 +269,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.Contains(err.Error(), "continue as new") // We inserted 4 invalid shapes in each. - // They should have filtered out as null on destination + // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") s.NoError(err) s.Equal(6, lineCount) @@ -312,8 +312,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -382,12 +382,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -398,7 +396,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { END; `, srcTableName, srcTableName)) s.NoError(err) - fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -444,8 +441,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -518,8 +515,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -587,8 +584,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -656,8 +653,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -731,8 +728,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -790,8 +787,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -954,8 +951,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1029,8 +1026,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1106,8 +1103,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1187,8 +1184,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/go.mod b/flow/go.mod index 40d5ee7fe0..f3343af084 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -8,7 +8,7 @@ require ( cloud.google.com/go/storage v1.35.1 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.2 - github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.1.1 + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 github.com/aws/aws-sdk-go v1.47.9 github.com/cenkalti/backoff/v4 v4.2.1 github.com/google/uuid v1.4.0 @@ -30,7 +30,7 @@ require ( github.com/twpayne/go-geos v0.14.0 github.com/urfave/cli/v2 v2.25.7 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a - go.temporal.io/api v1.25.0 + go.temporal.io/api v1.26.0 go.temporal.io/sdk v1.25.1 go.uber.org/atomic v1.11.0 go.uber.org/automaxprocs v1.5.3 diff --git a/flow/go.sum b/flow/go.sum index 77d0b13010..ed4b41ab72 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -28,12 +28,12 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EX github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.2 h1:ujuMdFIUqhfohvpjjt7YmWn6Wk5Vlw9cwtGC0/BEwLU= github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.2/go.mod h1:P39PnDHXbDhUV+BVw/8Nb7wQnM76jKUA7qx5T7eS+BU= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.1.1 h1:gZ1ZZvrVUhDNsGNpbo2N87Y0CJB8p3IS5UH9Z4Ui97g= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.1.1/go.mod h1:7fQVOnRA11ScLE8dOCWanXHQa2NMFOM2i0u/1VRICXA= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2/go.mod h1:FbdwsQ2EzwvXxOPcMFYO8ogEc9uMMIj3YkmCdXdAFmk= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0 h1:ECsQtyERDVz3NP3kvDOTLvbQhqWp/x9EsGKtb4ogUr8= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0/go.mod h1:s1tW/At+xHqjNFvWU4G0c0Qv33KOhvbGNj0RCTQDV8s= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0/go.mod h1:tI9M2Q/ueFi287QRkdrhb9LHm6ZnXgkVYLRC3FhYkPw= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0 h1:PTFGRSlMKCQelWwxUyYVEUqseBJVemLyqWJjvMyt0do= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0/go.mod h1:LRr2FzBTQlONPPa5HREE5+RjSCTXl7BwOvYOaWTqCaI= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1 h1:7CBQ+Ei8SP2c6ydQTGCCrS35bDxgTMfoP2miAwK++OU= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1/go.mod h1:c/wcGeGx5FUPbM/JltUYHZcKmigwyVLJlDq+4HdtXaw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0/go.mod h1:c+Lifp3EDEamAkPVzMooRNOK6CZjNSdEnf1A7jsI9u4= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= @@ -248,8 +248,6 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= -github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= -github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -299,8 +297,8 @@ github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -344,8 +342,8 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.temporal.io/api v1.25.0 h1:V6lIYuQlfmM1dc2vn6mIG5F2cY3EQ+xEjfTZ801Vpx8= -go.temporal.io/api v1.25.0/go.mod h1:LTJM9iMOIuiE5hRtym4Ne6I4rKlDGioUiscdD9D6N2Y= +go.temporal.io/api v1.26.0 h1:N4V0Daqa0qqK5+9LELSZV7clBYrwB4l33iaFfKgycPk= +go.temporal.io/api v1.26.0/go.mod h1:uVAcpQJ6bM4mxZ3m7vSHU65fHjrwy9ktGQMtsNfMZQQ= go.temporal.io/sdk v1.25.1 h1:jC9l9vHHz5OJ7PR6OjrpYSN4+uEG0bLe5rdF9nlMSGk= go.temporal.io/sdk v1.25.1/go.mod h1:X7iFKZpsj90BfszfpFCzLX8lwEJXbnRrl351/HyEgmU= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/flow/shared/constants.go b/flow/shared/constants.go index d1dfbdd6e1..a10d529189 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -2,7 +2,8 @@ package shared import ( "fmt" - "os" + + "github.com/PeerDB-io/peer-flow/connectors/utils" ) const ( @@ -11,6 +12,8 @@ const ( CDCFlowSignalName = "peer-flow-signal" ) +const MirrorNameSearchAttribute = "MirrorName" + type CDCFlowSignal int64 type ContextKey string @@ -32,13 +35,20 @@ const ( const FetchAndChannelSize = 256 * 1024 func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) { - deploymentUID := os.Getenv("PEERDB_DEPLOYMENT_UID") switch taskQueueID { case PeerFlowTaskQueueID: - return deploymentUID + "-" + peerFlowTaskQueue, nil + return prependUIDToTaskQueueName(peerFlowTaskQueue), nil case SnapshotFlowTaskQueueID: - return deploymentUID + "-" + snapshotFlowTaskQueue, nil + return prependUIDToTaskQueueName(snapshotFlowTaskQueue), nil default: return "", fmt.Errorf("unknown task queue id %d", taskQueueID) } } + +func prependUIDToTaskQueueName(taskQueueName string) string { + deploymentUID := utils.GetEnvString("PEERDB_DEPLOYMENT_UID", "") + if deploymentUID == "" { + return taskQueueName + } + return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName) +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index bbf134bf09..eb9b42cc03 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -33,6 +33,8 @@ type CDCFlowLimits struct { TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int + // Rows synced after which we can say a test is done. + ExitAfterRecords int } type CDCFlowWorkflowState struct { @@ -189,6 +191,10 @@ func CDCFlowWorkflowWithConfig( } } + mirrorNameSearch := map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + } + // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName) @@ -201,6 +207,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: mirrorNameSearch, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -226,7 +233,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, + TaskQueue: taskQueue, + SearchAttributes: mirrorNameSearch, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) @@ -283,6 +291,7 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum := 0 + totalRecordsSynced := 0 for { // check and act on signals before a fresh flow starts. @@ -318,11 +327,20 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum++ + // check if total records synced have been completed + if totalRecordsSynced == limits.ExitAfterRecords { + w.logger.Warn("All the records have been synced successfully, so ending the flow") + break + } + syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) if err != nil { return state, err } + mirrorNameSearch := map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + } // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, @@ -330,6 +348,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: mirrorNameSearch, } ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping @@ -348,9 +367,12 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) if childSyncFlowRes != nil { state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + totalRecordsSynced += int(childSyncFlowRes.NumRecordsSynced) } } + w.logger.Info("Total records synced: ", totalRecordsSynced) + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) if err != nil { return state, err @@ -362,6 +384,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: mirrorNameSearch, } ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 479b670876..9ee7d5dc0b 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -206,6 +206,9 @@ func (q *QRepFlowExecution) startChildWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, + SearchAttributes: map[string]interface{}{ + shared.MirrorNameSearchAttribute: q.config.FlowJobName, + }, }) future := workflow.ExecuteChildWorkflow( diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 1c08a40fac..ae12ecc7a4 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -145,7 +145,11 @@ func (s *SnapshotFlowExecution) cloneTable( if len(mapping.Exclude) != 0 { for _, v := range s.config.TableNameSchemaMapping { if v.TableIdentifier == srcName { - from = strings.Join(maps.Keys(v.Columns), ",") + cols := maps.Keys(v.Columns) + for i, col := range cols { + cols[i] = fmt.Sprintf(`"%s"`, col) + } + from = strings.Join(cols, ",") break } } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index d59de7cf69..0b207bc65f 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -83,7 +83,7 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, - HeartbeatTimeout: 5 * time.Minute, + HeartbeatTimeout: 30 * time.Second, }) // execute StartFlow on the peers to start the flow diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 21d09f0d08..e82f078039 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -112,7 +112,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -122,7 +122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -450,9 +450,9 @@ dependencies = [ [[package]] name = "cargo_toml" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ca592ad99e6a0fd4b95153406138b997cc26ccd3cd0aecdfd4fbdbf1519bd77" +checksum = "4d1ece59890e746567b467253aea0adbe8a21784d0b025d8a306f66c391c2957" dependencies = [ "serde", "toml 0.8.8", @@ -585,7 +585,7 @@ checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6" dependencies = [ "is-terminal", "lazy_static", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -725,7 +725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.2", + "hashbrown 0.14.3", "lock_api", "once_cell", "parking_lot_core", @@ -873,7 +873,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -897,7 +897,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall 0.3.5", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -962,9 +962,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -1131,9 +1131,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.28.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "glob" @@ -1171,9 +1171,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] name = "hdrhistogram" @@ -1221,7 +1221,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -1354,9 +1354,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -1379,7 +1379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.2", + "hashbrown 0.14.3", ] [[package]] @@ -1406,7 +1406,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", "rustix", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -1600,7 +1600,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -1742,9 +1742,9 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "openssl" -version = "0.10.59" +version = "0.10.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a257ad03cd8fb16ad4172fedf8094451e1af1c4b70097636ef2eac9a5f0cc33" +checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -1783,9 +1783,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.95" +version = "0.9.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a4130519a360279579c2053038317e40eff64d13fd3f004f9e1b72b8a6aaf9" +checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f" dependencies = [ "cc", "libc", @@ -2072,9 +2072,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "petgraph" @@ -2367,18 +2367,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a5a410fc7882af66deb8d01d01737353cf3ad6204c408177ba494291a626312" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", "prost-derive", @@ -2386,9 +2386,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa3d084c8704911bfefb2771be2f9b6c5c0da7343a71e0021ee3c665cada738" +checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck", @@ -2408,9 +2408,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065717a5dfaca4a83d2fe57db3487b311365200000551d7a364e715dbf4346bc" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", "itertools 0.11.0", @@ -2421,9 +2421,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8339f32236f590281e2f6368276441394fcd1b2133b549cc895d0ae80f2f9a52" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" dependencies = [ "prost", ] @@ -2722,7 +2722,7 @@ dependencies = [ "libc", "spin 0.9.8", "untrusted 0.9.0", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -2807,7 +2807,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -2889,7 +2889,7 @@ version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" dependencies = [ - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -3109,14 +3109,14 @@ dependencies = [ [[package]] name = "simple_logger" -version = "4.2.0" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2230cd5c29b815c9b699fb610b49a5ed65588f3509d9f0108be3a885da629333" +checksum = "da0ca6504625ee1aa5fda33913d2005eab98c7a42dd85f116ecce3ff54c9d3ef" dependencies = [ "colored", "log", "time", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] @@ -3186,7 +3186,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -3345,7 +3345,7 @@ dependencies = [ "fastrand", "redox_syscall 0.4.1", "rustix", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -3450,7 +3450,7 @@ dependencies = [ "socket2 0.5.5", "tokio-macros", "tracing", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -3850,9 +3850,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "ureq" -version = "2.9.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7830e33f6e25723d41a63f77e434159dad02919f18f55a512b5f16f3b1d77138" +checksum = "f8cdd25c339e200129fe4de81451814e5228c9b771d57378817d6117cc2b3f97" dependencies = [ "base64 0.21.5", "encoding_rs", @@ -3869,9 +3869,9 @@ dependencies = [ [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", @@ -4039,9 +4039,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "which" @@ -4105,21 +4105,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "windows-sys" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-sys" version = "0.48.0" @@ -4135,93 +4120,51 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -4244,7 +4187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ "cfg-if", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] diff --git a/scripts/mirror-name-search.sh b/scripts/mirror-name-search.sh new file mode 100644 index 0000000000..da50963d72 --- /dev/null +++ b/scripts/mirror-name-search.sh @@ -0,0 +1,9 @@ +sleep 5 + +# Check if MirrorName attribute exists +if ! temporal operator search-attribute list | grep -w MirrorName >/dev/null 2>&1; then + # If not, create MirrorName attribute + temporal operator search-attribute create --name MirrorName --type Text --namespace default +fi + +tini -s -- sleep infinity diff --git a/ui/app/peers/[peerName]/datatables.tsx b/ui/app/peers/[peerName]/datatables.tsx deleted file mode 100644 index f86f7c5536..0000000000 --- a/ui/app/peers/[peerName]/datatables.tsx +++ /dev/null @@ -1,170 +0,0 @@ -import { CopyButton } from '@/components/CopyButton'; -import TimeLabel from '@/components/TimeComponent'; -import { SlotInfo, StatInfo } from '@/grpc_generated/route'; -import { Label } from '@/lib/Label'; -import { Table, TableCell, TableRow } from '@/lib/Table'; -import { DurationDisplay, SlotNameDisplay } from './helpers'; - -export const SlotTable = ({ data }: { data: SlotInfo[] }) => { - return ( -