From f7947e0617a36579722e9e79bde8c60929ba54d6 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 12 Sep 2023 20:17:41 +0530 Subject: [PATCH] refactored PeerFlow -> CDCFlow --- flow/activities/fetch_config.go | 121 ------------ flow/cmd/handler.go | 24 +-- flow/cmd/worker.go | 4 +- flow/e2e/bigquery/peer_flow_bq_test.go | 78 ++++---- flow/e2e/eventhub/peer_flow_eh_test.go | 6 +- flow/e2e/postgres/peer_flow_pg_test.go | 12 +- flow/e2e/postgres/timescale.sql | 53 ++++++ flow/e2e/snowflake/peer_flow_sf_test.go | 66 +++---- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 2 +- flow/e2e/test_utils.go | 14 +- flow/generated/protos/route.pb.go | 175 +++++++++--------- flow/generated/protos/route_grpc.pb.go | 30 +-- flow/shared/constants.go | 6 +- flow/workflows/activities.go | 5 +- flow/workflows/{peer_flow.go => cdc_flow.go} | 137 +++----------- flow/workflows/setup_flow.go | 28 +-- flow/workflows/sync_flow.go | 22 +-- nexus/pt/src/peerdb_route.rs | 4 +- nexus/pt/src/peerdb_route.serde.rs | 32 ++-- nexus/pt/src/peerdb_route.tonic.rs | 32 ++-- protos/route.proto | 6 +- 21 files changed, 353 insertions(+), 504 deletions(-) delete mode 100644 flow/activities/fetch_config.go create mode 100644 flow/e2e/postgres/timescale.sql rename flow/workflows/{peer_flow.go => cdc_flow.go} (71%) diff --git a/flow/activities/fetch_config.go b/flow/activities/fetch_config.go deleted file mode 100644 index 93e74ec1dd..0000000000 --- a/flow/activities/fetch_config.go +++ /dev/null @@ -1,121 +0,0 @@ -package activities - -import ( - "context" - "fmt" - - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/jackc/pgx/v5/pgxpool" - log "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" -) - -// FetchConfigActivityInput is the input for the FetchConfigActivity. -type FetchConfigActivityInput struct { - // The JDBC URL for the catalog database. - CatalogJdbcURL string - // The name of the peer flow to fetch the config for. - PeerFlowName string -} - -// FetchConfigActivity is an activity that fetches the config for the specified peer flow. -// This activity is invoked by the PeerFlowWorkflow. -type FetchConfigActivity struct{} - -// FetchConfig retrieves the source and destination config. -func (a *FetchConfigActivity) FetchConfig( - ctx context.Context, - input *FetchConfigActivityInput, -) (*protos.FlowConnectionConfigs, error) { - pool, err := pgxpool.New(ctx, input.CatalogJdbcURL) - if err != nil { - return nil, fmt.Errorf("failed to create connection pool: %w", err) - } - - sourceConnectionConfig, err := FetchPeerConfig(ctx, pool, input.PeerFlowName, "source_peer") - if err != nil { - return nil, fmt.Errorf("failed to unmarshal source connection config: %w", err) - } - - destinationConnectionConfig, err := FetchPeerConfig(ctx, pool, input.PeerFlowName, "destination_peer") - if err != nil { - return nil, fmt.Errorf("failed to unmarshal destination connection config: %w", err) - } - - query := `SELECT source_table_identifier, destination_table_identifier FROM flows WHERE name = $1` - rows, err := pool.Query(ctx, query, input.PeerFlowName) - if err != nil { - return nil, fmt.Errorf("failed to fetch table identifiers: %w", err) - } - defer rows.Close() - // Create a map to store the mapping of source table to destination table - tableNameMapping := make(map[string]string) - var srcTableIdentifier, dstTableIdentifier string - - // Iterate over all the result rows - for rows.Next() { - err = rows.Scan(&srcTableIdentifier, &dstTableIdentifier) - if err != nil { - return nil, fmt.Errorf("error scanning row %w", err) - } - - // Store the tableNameMapping in the map - tableNameMapping[srcTableIdentifier] = dstTableIdentifier - } - - log.Printf("successfully fetched config for peer flow - %s", input.PeerFlowName) - - return &protos.FlowConnectionConfigs{ - Source: sourceConnectionConfig, - Destination: destinationConnectionConfig, - TableNameMapping: tableNameMapping, - }, nil -} - -// fetchPeerConfig retrieves the config for a given peer by join label. -func FetchPeerConfig(ctx context.Context, pool *pgxpool.Pool, flowName string, label string) (*protos.Peer, error) { - var name string - var dbtype int32 - var opts []byte - - query := fmt.Sprintf( - "SELECT e.name, e.type, e.options FROM flows f JOIN peers e ON f.%s = e.id WHERE f.name = $1", - label) - err := pool.QueryRow(ctx, query, flowName).Scan(&name, &dbtype, &opts) - if err != nil { - return nil, fmt.Errorf("failed to fetch config for %s: %w", label, err) - } - - res := &protos.Peer{ - Name: name, - Type: protos.DBType(dbtype), - } - - switch protos.DBType(dbtype) { - case protos.DBType_BIGQUERY: - var peerConfig protos.BigqueryConfig - err = proto.Unmarshal(opts, &peerConfig) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal bigquery config: %w", err) - } - res.Config = &protos.Peer_BigqueryConfig{BigqueryConfig: &peerConfig} - case protos.DBType_POSTGRES: - var peerConfig protos.PostgresConfig - err = proto.Unmarshal(opts, &peerConfig) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal postgres config: %w", err) - } - res.Config = &protos.Peer_PostgresConfig{PostgresConfig: &peerConfig} - case protos.DBType_SNOWFLAKE: - var peerConfig protos.SnowflakeConfig - err = proto.Unmarshal(opts, &peerConfig) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal snowflake config: %w", err) - } - res.Config = &protos.Peer_SnowflakeConfig{SnowflakeConfig: &peerConfig} - default: - return nil, fmt.Errorf("unsupported database type: %d", dbtype) - } - - return res, nil -} diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 5bfa19bcb7..f5b7be01b1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -23,8 +23,8 @@ func NewFlowRequestHandler(temporalClient client.Client) *FlowRequestHandler { } } -func (h *FlowRequestHandler) CreatePeerFlow( - ctx context.Context, req *protos.CreatePeerFlowRequest) (*protos.CreatePeerFlowResponse, error) { +func (h *FlowRequestHandler) CreateCDCFlow( + ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ @@ -38,26 +38,26 @@ func (h *FlowRequestHandler) CreatePeerFlow( cfg.MaxBatchSize = uint32(maxBatchSize) } - limits := &peerflow.PeerFlowLimits{ + limits := &peerflow.CDCFlowLimits{ TotalSyncFlows: 0, TotalNormalizeFlows: 0, MaxBatchSize: maxBatchSize, } - state := peerflow.NewStartedPeerFlowState() + state := peerflow.NewCDCFlowState() _, err := h.temporalClient.ExecuteWorkflow( - ctx, // context - workflowOptions, // workflow start options - peerflow.PeerFlowWorkflowWithConfig, // workflow function - cfg, // workflow input - limits, // workflow limits - state, // workflow state + ctx, // context + workflowOptions, // workflow start options + peerflow.CDCFlowWorkflowWithConfig, // workflow function + cfg, // workflow input + limits, // workflow limits + state, // workflow state ) if err != nil { return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) } - return &protos.CreatePeerFlowResponse{ + return &protos.CreateCDCFlowResponse{ WorflowId: workflowID, }, nil } @@ -100,7 +100,7 @@ func (h *FlowRequestHandler) ShutdownFlow( ctx, req.WorkflowId, "", - shared.PeerFlowSignalName, + shared.CDCFlowSignalName, shared.ShutdownSignal, ) if err != nil { diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 66968bc10f..974c4900ed 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -107,15 +107,13 @@ func WorkerMain(opts *WorkerOptions) error { defer c.Close() w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{}) - w.RegisterWorkflow(peerflow.PeerFlowWorkflow) - w.RegisterWorkflow(peerflow.PeerFlowWorkflowWithConfig) + w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) w.RegisterWorkflow(peerflow.SyncFlowWorkflow) w.RegisterWorkflow(peerflow.SetupFlowWorkflow) w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) w.RegisterWorkflow(peerflow.QRepFlowWorkflow) w.RegisterWorkflow(peerflow.QRepPartitionWorkflow) w.RegisterWorkflow(peerflow.DropFlowWorkflow) - w.RegisterActivity(&activities.FetchConfigActivity{}) w.RegisterActivity(&activities.FlowableActivity{ EnableMetrics: opts.EnableMetrics, CatalogMirrorMonitor: &catalogMirrorMonitor, diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 72acb12230..649f59ede9 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -95,12 +95,12 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { e2e.RegisterWorkflowsAndActivities(env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 1, } - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, nil, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) // Verify workflow completes s.True(env.IsWorkflowCompleted()) @@ -139,12 +139,12 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 1, } - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -183,12 +183,12 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 1, } - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -230,7 +230,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -238,7 +238,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -251,7 +251,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { fmt.Println("Inserted 10 rows into the source table") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -301,7 +301,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -309,7 +309,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* Executing a transaction which 1. changes both toast column @@ -328,7 +328,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -372,7 +372,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -380,7 +380,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -392,7 +392,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -436,7 +436,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -444,7 +444,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) //complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -469,7 +469,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -512,7 +512,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -520,7 +520,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) //complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -539,7 +539,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -583,7 +583,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -591,7 +591,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating a single row multiple times with changed/unchanged toast columns @@ -609,7 +609,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -658,7 +658,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, @@ -667,7 +667,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -687,7 +687,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { fmt.Println("Executed an insert with all types") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -747,7 +747,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, @@ -756,7 +756,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -776,7 +776,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { fmt.Println("Executed an insert with all types") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -826,13 +826,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) @@ -844,7 +844,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { fmt.Println("Inserted 10 rows into the source table") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -889,7 +889,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -897,7 +897,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -907,7 +907,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { fmt.Println("Executed an insert on two tables") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error require.True(s.T(), env.IsWorkflowCompleted()) @@ -949,7 +949,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, MaxBatchSize: 100, } @@ -958,7 +958,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) s.NoError(err) @@ -1015,7 +1015,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/eventhub/peer_flow_eh_test.go b/flow/e2e/eventhub/peer_flow_eh_test.go index 687f17face..5462975508 100644 --- a/flow/e2e/eventhub/peer_flow_eh_test.go +++ b/flow/e2e/eventhub/peer_flow_eh_test.go @@ -117,7 +117,7 @@ func (s *PeerFlowE2ETestSuiteEH) Test_Complete_Simple_Flow_EH() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - peerFlowInput := peerflow.PeerFlowLimits{ + peerFlowInput := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -125,7 +125,7 @@ func (s *PeerFlowE2ETestSuiteEH) Test_Complete_Simple_Flow_EH() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -138,7 +138,7 @@ func (s *PeerFlowE2ETestSuiteEH) Test_Complete_Simple_Flow_EH() { fmt.Println("Inserted 10 rows into the source table") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &peerFlowInput, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &peerFlowInput, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index ec120a1e8c..8ee321ee95 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -42,7 +42,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -50,7 +50,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -63,7 +63,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { fmt.Println("Inserted 10 rows into the source table") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -104,7 +104,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, MaxBatchSize: 100, } @@ -113,7 +113,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) s.NoError(err) @@ -170,7 +170,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { s.NoError(err) }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/postgres/timescale.sql b/flow/e2e/postgres/timescale.sql new file mode 100644 index 0000000000..e635cba839 --- /dev/null +++ b/flow/e2e/postgres/timescale.sql @@ -0,0 +1,53 @@ +CREATE PEER source_pg_2 FROM POSTGRES WITH +( + host = 'kevin-test-cluster.ctwiqpycdrx0.us-east-2.rds.amazonaws.com', + port = '5432', + user = 'postgres', + password = 'SUMM3RN!GHTS', + database = 'ts2' +); + +CREATE PEER target_ts_2 FROM POSTGRES WITH +( + host = '3.19.228.194', + port = '5432', + user = 'postgres', + password = 'T1mesc@l3', + database = 'dst2' +); + +CREATE TABLE public.diagnostics ( + id bigint, + "time" timestamp with time zone, + tags_id integer, + fuel_state double precision, + current_load double precision, + status double precision, + additional_tags jsonb, + primary key(id, "time") +); + +SELECT create_hypertable('diagnostics', 'time', chunk_time_interval => INTERVAL '12 hours'); + +CREATE TABLE public.readings ( + id bigint, + "time" timestamp with time zone, + tags_id integer, + latitude double precision, + longitude double precision, + elevation double precision, + velocity double precision, + heading double precision, + grade double precision, + fuel_consumption double precision, + additional_tags jsonb, + primary key(id, "time") +); + +SELECT create_hypertable('readings', 'time', chunk_time_interval => INTERVAL '12 hours'); + +CREATE MIRROR tstsv4 FROM source_pg_2 TO target_ts_2 WITH TABLE MAPPING(public.diagnostics:public.diagnostics,public.readings:public.readings); + +flow_worker1 | time="2023-08-30T06:47:18Z" level=info msg="RelationMessage => RelationID: 16747, Namespace: public, RelationName: fss1, Columns: [0x400175e360 0x400175e380]" +flow_worker1 | time="2023-08-30T06:47:18Z" level=info msg="23 1 id -1\n" +flow_worker1 | time="2023-08-30T06:47:18Z" level=info msg="20 0 c1 -1\n" diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index b6090a9df0..796402d033 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -111,7 +111,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -119,7 +119,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -132,7 +132,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { fmt.Println("Inserted 10 rows into the source table") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -179,7 +179,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -187,7 +187,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -200,7 +200,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { fmt.Println("Inserted 10 rows into the source table") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -250,7 +250,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -258,7 +258,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* Executing a transaction which 1. changes both toast column @@ -277,7 +277,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -321,7 +321,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -329,7 +329,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -341,7 +341,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -385,7 +385,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -393,7 +393,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) //complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -418,7 +418,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -461,7 +461,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -469,7 +469,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) //complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -488,7 +488,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -532,7 +532,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -540,7 +540,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating a single row multiple times with changed/unchanged toast columns @@ -558,7 +558,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { fmt.Println("Executed a transaction touching toast columns") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -607,7 +607,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -615,7 +615,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -632,7 +632,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { fmt.Println("Executed an insert with all types") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -691,7 +691,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -699,7 +699,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -716,7 +716,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { fmt.Println("Executed an insert with all types") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -764,7 +764,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, MaxBatchSize: 100, } @@ -772,7 +772,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -782,7 +782,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { fmt.Println("Executed an insert with all types") }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -824,7 +824,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) - limits := peerflow.PeerFlowLimits{ + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, MaxBatchSize: 100, } @@ -833,7 +833,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupPeerFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) s.NoError(err) @@ -890,7 +890,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() - env.ExecuteWorkflow(peerflow.PeerFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 4a072f2011..aad017c01a 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -29,7 +29,7 @@ type PeerFlowE2ETestSuiteSQLServer struct { sqlsHelper *SQLServerHelper } -func TestPeerFlowE2ETestSuiteSQLServer(t *testing.T) { +func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { suite.Run(t, new(PeerFlowE2ETestSuiteSQLServer)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 2d4b005dff..f3b3b00fa3 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -43,31 +43,29 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { // set a 300 second timeout for the workflow to execute a few runs. env.SetTestTimeout(300 * time.Second) - env.RegisterWorkflow(peerflow.PeerFlowWorkflow) - env.RegisterWorkflow(peerflow.PeerFlowWorkflowWithConfig) + env.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) env.RegisterWorkflow(peerflow.SyncFlowWorkflow) env.RegisterWorkflow(peerflow.SetupFlowWorkflow) env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) env.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) env.RegisterWorkflow(peerflow.QRepFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) - env.RegisterActivity(&activities.FetchConfigActivity{}) env.RegisterActivity(&activities.FlowableActivity{}) env.RegisterActivity(&activities.SnapshotActivity{}) } -func SetupPeerFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, +func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig) { // wait for PeerFlowStatusQuery to finish setup // sleep for 5 second to allow the workflow to start time.Sleep(5 * time.Second) for { response, err := env.QueryWorkflow( - peerflow.PeerFlowStatusQuery, + peerflow.CDCFlowStatusQuery, connectionGen.FlowJobName, ) if err == nil { - var state peerflow.PeerFlowState + var state peerflow.CDCFlowState err = response.Get(&state) if err != nil { log.Errorln(err) @@ -93,11 +91,11 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, time.Sleep(5 * time.Second) for { response, err := env.QueryWorkflow( - peerflow.PeerFlowStatusQuery, + peerflow.CDCFlowStatusQuery, connectionGen.FlowJobName, ) if err == nil { - var state peerflow.PeerFlowState + var state peerflow.CDCFlowState err = response.Get(&state) if err != nil { log.Errorln(err) diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index 9a4fda5b15..b6317e19d3 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -21,7 +21,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type CreatePeerFlowRequest struct { +type CreateCDCFlowRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -29,8 +29,8 @@ type CreatePeerFlowRequest struct { ConnectionConfigs *FlowConnectionConfigs `protobuf:"bytes,1,opt,name=connection_configs,json=connectionConfigs,proto3" json:"connection_configs,omitempty"` } -func (x *CreatePeerFlowRequest) Reset() { - *x = CreatePeerFlowRequest{} +func (x *CreateCDCFlowRequest) Reset() { + *x = CreateCDCFlowRequest{} if protoimpl.UnsafeEnabled { mi := &file_route_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -38,13 +38,13 @@ func (x *CreatePeerFlowRequest) Reset() { } } -func (x *CreatePeerFlowRequest) String() string { +func (x *CreateCDCFlowRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*CreatePeerFlowRequest) ProtoMessage() {} +func (*CreateCDCFlowRequest) ProtoMessage() {} -func (x *CreatePeerFlowRequest) ProtoReflect() protoreflect.Message { +func (x *CreateCDCFlowRequest) ProtoReflect() protoreflect.Message { mi := &file_route_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -56,19 +56,19 @@ func (x *CreatePeerFlowRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use CreatePeerFlowRequest.ProtoReflect.Descriptor instead. -func (*CreatePeerFlowRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use CreateCDCFlowRequest.ProtoReflect.Descriptor instead. +func (*CreateCDCFlowRequest) Descriptor() ([]byte, []int) { return file_route_proto_rawDescGZIP(), []int{0} } -func (x *CreatePeerFlowRequest) GetConnectionConfigs() *FlowConnectionConfigs { +func (x *CreateCDCFlowRequest) GetConnectionConfigs() *FlowConnectionConfigs { if x != nil { return x.ConnectionConfigs } return nil } -type CreatePeerFlowResponse struct { +type CreateCDCFlowResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -76,8 +76,8 @@ type CreatePeerFlowResponse struct { WorflowId string `protobuf:"bytes,1,opt,name=worflow_id,json=worflowId,proto3" json:"worflow_id,omitempty"` } -func (x *CreatePeerFlowResponse) Reset() { - *x = CreatePeerFlowResponse{} +func (x *CreateCDCFlowResponse) Reset() { + *x = CreateCDCFlowResponse{} if protoimpl.UnsafeEnabled { mi := &file_route_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -85,13 +85,13 @@ func (x *CreatePeerFlowResponse) Reset() { } } -func (x *CreatePeerFlowResponse) String() string { +func (x *CreateCDCFlowResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*CreatePeerFlowResponse) ProtoMessage() {} +func (*CreateCDCFlowResponse) ProtoMessage() {} -func (x *CreatePeerFlowResponse) ProtoReflect() protoreflect.Message { +func (x *CreateCDCFlowResponse) ProtoReflect() protoreflect.Message { mi := &file_route_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -103,12 +103,12 @@ func (x *CreatePeerFlowResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use CreatePeerFlowResponse.ProtoReflect.Descriptor instead. -func (*CreatePeerFlowResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use CreateCDCFlowResponse.ProtoReflect.Descriptor instead. +func (*CreateCDCFlowResponse) Descriptor() ([]byte, []int) { return file_route_proto_rawDescGZIP(), []int{1} } -func (x *CreatePeerFlowResponse) GetWorflowId() string { +func (x *CreateCDCFlowResponse) GetWorflowId() string { if x != nil { return x.WorflowId } @@ -343,70 +343,69 @@ var file_route_proto_rawDesc = []byte{ 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0b, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6a, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, - 0x65, 0x65, 0x72, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, - 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, - 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x11, - 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x73, 0x22, 0x37, 0x0a, 0x16, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x46, - 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, - 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x09, 0x77, 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0x51, 0x0a, 0x15, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0b, 0x71, 0x72, 0x65, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x52, 0x0a, 0x71, 0x72, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x37, 0x0a, - 0x16, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x66, 0x6c, - 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, 0x6f, 0x72, - 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0xca, 0x01, 0x0a, 0x0f, 0x53, 0x68, 0x75, 0x74, 0x64, - 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, - 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x66, - 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, - 0x33, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, - 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x50, 0x65, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, - 0x65, 0x72, 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, - 0x65, 0x65, 0x72, 0x22, 0x47, 0x0a, 0x10, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x9c, 0x02, 0x0a, - 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5d, 0x0a, 0x0e, - 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x23, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x46, 0x6c, 0x6f, - 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0e, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x23, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x69, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, + 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, 0x0a, + 0x12, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x11, 0x63, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, + 0x22, 0x36, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, + 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, + 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, + 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0x51, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x38, 0x0a, 0x0b, 0x71, 0x72, 0x65, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, + 0x0a, 0x71, 0x72, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x37, 0x0a, 0x16, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, 0x6f, 0x72, 0x66, 0x6c, + 0x6f, 0x77, 0x49, 0x64, 0x22, 0xca, 0x01, 0x0a, 0x0f, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, + 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, + 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x33, 0x0a, + 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, + 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x65, + 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, + 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x65, 0x65, + 0x72, 0x22, 0x47, 0x0a, 0x10, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x99, 0x02, 0x0a, 0x0b, 0x46, + 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5a, 0x0a, 0x0d, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, + 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x53, 0x68, - 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, - 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, - 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x7c, 0x0a, 0x10, 0x63, - 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, - 0x0a, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, - 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, - 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, - 0x75, 0x74, 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, - 0x65, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, - 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, + 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, 0x74, + 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, + 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, + 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, 0x02, + 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, 0x50, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, + 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -423,8 +422,8 @@ func file_route_proto_rawDescGZIP() []byte { var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_route_proto_goTypes = []interface{}{ - (*CreatePeerFlowRequest)(nil), // 0: peerdb_route.CreatePeerFlowRequest - (*CreatePeerFlowResponse)(nil), // 1: peerdb_route.CreatePeerFlowResponse + (*CreateCDCFlowRequest)(nil), // 0: peerdb_route.CreateCDCFlowRequest + (*CreateCDCFlowResponse)(nil), // 1: peerdb_route.CreateCDCFlowResponse (*CreateQRepFlowRequest)(nil), // 2: peerdb_route.CreateQRepFlowRequest (*CreateQRepFlowResponse)(nil), // 3: peerdb_route.CreateQRepFlowResponse (*ShutdownRequest)(nil), // 4: peerdb_route.ShutdownRequest @@ -434,14 +433,14 @@ var file_route_proto_goTypes = []interface{}{ (*Peer)(nil), // 8: peerdb_peers.Peer } var file_route_proto_depIdxs = []int32{ - 6, // 0: peerdb_route.CreatePeerFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 6, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs 7, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig 8, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer 8, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer - 0, // 4: peerdb_route.FlowService.CreatePeerFlow:input_type -> peerdb_route.CreatePeerFlowRequest + 0, // 4: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest 2, // 5: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest 4, // 6: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest - 1, // 7: peerdb_route.FlowService.CreatePeerFlow:output_type -> peerdb_route.CreatePeerFlowResponse + 1, // 7: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse 3, // 8: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse 5, // 9: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse 7, // [7:10] is the sub-list for method output_type @@ -460,7 +459,7 @@ func file_route_proto_init() { file_flow_proto_init() if !protoimpl.UnsafeEnabled { file_route_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CreatePeerFlowRequest); i { + switch v := v.(*CreateCDCFlowRequest); i { case 0: return &v.state case 1: @@ -472,7 +471,7 @@ func file_route_proto_init() { } } file_route_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CreatePeerFlowResponse); i { + switch v := v.(*CreateCDCFlowResponse); i { case 0: return &v.state case 1: diff --git a/flow/generated/protos/route_grpc.pb.go b/flow/generated/protos/route_grpc.pb.go index ec54177470..3aa87fed33 100644 --- a/flow/generated/protos/route_grpc.pb.go +++ b/flow/generated/protos/route_grpc.pb.go @@ -19,7 +19,7 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - FlowService_CreatePeerFlow_FullMethodName = "/peerdb_route.FlowService/CreatePeerFlow" + FlowService_CreateCDCFlow_FullMethodName = "/peerdb_route.FlowService/CreateCDCFlow" FlowService_CreateQRepFlow_FullMethodName = "/peerdb_route.FlowService/CreateQRepFlow" FlowService_ShutdownFlow_FullMethodName = "/peerdb_route.FlowService/ShutdownFlow" ) @@ -28,7 +28,7 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type FlowServiceClient interface { - CreatePeerFlow(ctx context.Context, in *CreatePeerFlowRequest, opts ...grpc.CallOption) (*CreatePeerFlowResponse, error) + CreateCDCFlow(ctx context.Context, in *CreateCDCFlowRequest, opts ...grpc.CallOption) (*CreateCDCFlowResponse, error) CreateQRepFlow(ctx context.Context, in *CreateQRepFlowRequest, opts ...grpc.CallOption) (*CreateQRepFlowResponse, error) ShutdownFlow(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) } @@ -41,9 +41,9 @@ func NewFlowServiceClient(cc grpc.ClientConnInterface) FlowServiceClient { return &flowServiceClient{cc} } -func (c *flowServiceClient) CreatePeerFlow(ctx context.Context, in *CreatePeerFlowRequest, opts ...grpc.CallOption) (*CreatePeerFlowResponse, error) { - out := new(CreatePeerFlowResponse) - err := c.cc.Invoke(ctx, FlowService_CreatePeerFlow_FullMethodName, in, out, opts...) +func (c *flowServiceClient) CreateCDCFlow(ctx context.Context, in *CreateCDCFlowRequest, opts ...grpc.CallOption) (*CreateCDCFlowResponse, error) { + out := new(CreateCDCFlowResponse) + err := c.cc.Invoke(ctx, FlowService_CreateCDCFlow_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -72,7 +72,7 @@ func (c *flowServiceClient) ShutdownFlow(ctx context.Context, in *ShutdownReques // All implementations must embed UnimplementedFlowServiceServer // for forward compatibility type FlowServiceServer interface { - CreatePeerFlow(context.Context, *CreatePeerFlowRequest) (*CreatePeerFlowResponse, error) + CreateCDCFlow(context.Context, *CreateCDCFlowRequest) (*CreateCDCFlowResponse, error) CreateQRepFlow(context.Context, *CreateQRepFlowRequest) (*CreateQRepFlowResponse, error) ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) mustEmbedUnimplementedFlowServiceServer() @@ -82,8 +82,8 @@ type FlowServiceServer interface { type UnimplementedFlowServiceServer struct { } -func (UnimplementedFlowServiceServer) CreatePeerFlow(context.Context, *CreatePeerFlowRequest) (*CreatePeerFlowResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method CreatePeerFlow not implemented") +func (UnimplementedFlowServiceServer) CreateCDCFlow(context.Context, *CreateCDCFlowRequest) (*CreateCDCFlowResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateCDCFlow not implemented") } func (UnimplementedFlowServiceServer) CreateQRepFlow(context.Context, *CreateQRepFlowRequest) (*CreateQRepFlowResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method CreateQRepFlow not implemented") @@ -104,20 +104,20 @@ func RegisterFlowServiceServer(s grpc.ServiceRegistrar, srv FlowServiceServer) { s.RegisterService(&FlowService_ServiceDesc, srv) } -func _FlowService_CreatePeerFlow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CreatePeerFlowRequest) +func _FlowService_CreateCDCFlow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateCDCFlowRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(FlowServiceServer).CreatePeerFlow(ctx, in) + return srv.(FlowServiceServer).CreateCDCFlow(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: FlowService_CreatePeerFlow_FullMethodName, + FullMethod: FlowService_CreateCDCFlow_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FlowServiceServer).CreatePeerFlow(ctx, req.(*CreatePeerFlowRequest)) + return srv.(FlowServiceServer).CreateCDCFlow(ctx, req.(*CreateCDCFlowRequest)) } return interceptor(ctx, in, info, handler) } @@ -166,8 +166,8 @@ var FlowService_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*FlowServiceServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "CreatePeerFlow", - Handler: _FlowService_CreatePeerFlow_Handler, + MethodName: "CreateCDCFlow", + Handler: _FlowService_CreateCDCFlow_Handler, }, { MethodName: "CreateQRepFlow", diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 5334243e59..17efcfc452 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -3,14 +3,14 @@ package shared const ( PeerFlowTaskQueue = "peer-flow-task-queue" SnapshotFlowTaskQueue = "snapshot-flow-task-queue" - PeerFlowSignalName = "peer-flow-signal" + CDCFlowSignalName = "peer-flow-signal" ) -type PeerFlowSignal int64 +type CDCFlowSignal int64 type ContextKey string const ( - NoopSignal PeerFlowSignal = iota + NoopSignal CDCFlowSignal = iota ShutdownSignal EnableMetricsKey ContextKey = "enableMetrics" CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor" diff --git a/flow/workflows/activities.go b/flow/workflows/activities.go index 5b318031af..0b23d10dd1 100644 --- a/flow/workflows/activities.go +++ b/flow/workflows/activities.go @@ -3,7 +3,6 @@ package peerflow import "github.com/PeerDB-io/peer-flow/activities" var ( - fetchConfig *activities.FetchConfigActivity - flowable *activities.FlowableActivity - snapshot *activities.SnapshotActivity + flowable *activities.FlowableActivity + snapshot *activities.SnapshotActivity ) diff --git a/flow/workflows/peer_flow.go b/flow/workflows/cdc_flow.go similarity index 71% rename from flow/workflows/peer_flow.go rename to flow/workflows/cdc_flow.go index e509e59e52..46d44ad51b 100644 --- a/flow/workflows/peer_flow.go +++ b/flow/workflows/cdc_flow.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" @@ -17,11 +16,11 @@ import ( ) const ( - PeerFlowStatusQuery = "q-peer-flow-status" - maxSyncFlowsPerPeerFlow = 32 + CDCFlowStatusQuery = "q-cdc-flow-status" + maxSyncFlowsPerCDCFlow = 32 ) -type PeerFlowLimits struct { +type CDCFlowLimits struct { // Number of sync flows to execute in total. // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. // This is typically non-zero for testing purposes. @@ -34,8 +33,8 @@ type PeerFlowLimits struct { MaxBatchSize int } -type PeerFlowWorkflowInput struct { - PeerFlowLimits +type CDCFlowWorkflowInput struct { + CDCFlowLimits // The JDBC URL for the catalog database. CatalogJdbcURL string // The name of the peer flow to execute. @@ -52,7 +51,7 @@ type PeerFlowWorkflowInput struct { MaxBatchSize int } -type PeerFlowState struct { +type CDCFlowState struct { // Progress events for the peer flow. Progress []string // Accumulates status for sync flows spawned. @@ -60,7 +59,7 @@ type PeerFlowState struct { // Accumulates status for sync flows spawned. NormalizeFlowStatuses []*model.NormalizeResponse // Current signalled state of the peer flow. - ActiveSignal shared.PeerFlowSignal + ActiveSignal shared.CDCFlowSignal // SetupComplete indicates whether the peer flow setup has completed. SetupComplete bool // Errors encountered during child sync flow executions. @@ -73,8 +72,8 @@ type PeerFlowState struct { } // returns a new empty PeerFlowState -func NewStartedPeerFlowState() *PeerFlowState { - return &PeerFlowState{ +func NewCDCFlowState() *CDCFlowState { + return &CDCFlowState{ Progress: []string{"started"}, SyncFlowStatuses: nil, NormalizeFlowStatuses: nil, @@ -93,7 +92,7 @@ func NewStartedPeerFlowState() *PeerFlowState { } // truncate the progress and other arrays to a max of 10 elements -func (s *PeerFlowState) TruncateProgress() { +func (s *CDCFlowState) TruncateProgress() { if len(s.Progress) > 10 { s.Progress = s.Progress[len(s.Progress)-10:] } @@ -105,54 +104,20 @@ func (s *PeerFlowState) TruncateProgress() { } } -// PeerFlowWorkflowExecution represents the state for execution of a peer flow. -type PeerFlowWorkflowExecution struct { +// CDCFlowWorkflowExecution represents the state for execution of a peer flow. +type CDCFlowWorkflowExecution struct { flowExecutionID string logger log.Logger } -// NewPeerFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution. -func NewPeerFlowWorkflowExecution(ctx workflow.Context) *PeerFlowWorkflowExecution { - return &PeerFlowWorkflowExecution{ +// NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution. +func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution { + return &CDCFlowWorkflowExecution{ flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, logger: workflow.GetLogger(ctx), } } -// fetchConnectionConfigs fetches the connection configs for source and destination peers. -func fetchConnectionConfigs( - ctx workflow.Context, - logger log.Logger, - input *PeerFlowWorkflowInput, -) (*protos.FlowConnectionConfigs, error) { - logger.Info("fetching connection configs for peer flow - ", input.PeerFlowName) - - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, - }) - - fetchConfigActivityInput := &activities.FetchConfigActivityInput{ - CatalogJdbcURL: input.CatalogJdbcURL, - PeerFlowName: input.PeerFlowName, - } - - configsFuture := workflow.ExecuteActivity(ctx, fetchConfig.FetchConfig, fetchConfigActivityInput) - - flowConnectionConfigs := &protos.FlowConnectionConfigs{} - if err := configsFuture.Get(ctx, &flowConnectionConfigs); err != nil { - return nil, fmt.Errorf("failed to fetch connection configs: %w", err) - } - - if flowConnectionConfigs == nil || - flowConnectionConfigs.Source == nil || - flowConnectionConfigs.Destination == nil { - return nil, fmt.Errorf("invalid connection configs") - } - - logger.Info("fetched connection configs for peer flow - ", input.PeerFlowName) - return flowConnectionConfigs, nil -} - func GetChildWorkflowID( ctx workflow.Context, prefix string, @@ -170,81 +135,39 @@ func GetChildWorkflowID( return childWorkflowID, nil } -// PeerFlowWorkflowResult is the result of the PeerFlowWorkflow. -type PeerFlowWorkflowResult = PeerFlowState - -// PeerFlowWorkflow is the workflow that executes the specified peer flow. -// This is the main entry point for the application. -func PeerFlowWorkflow(ctx workflow.Context, input *PeerFlowWorkflowInput) (*PeerFlowWorkflowResult, error) { - fconn, err := fetchConnectionConfigs(ctx, workflow.GetLogger(ctx), input) - if err != nil { - return nil, err - } - - fconn.FlowJobName = input.PeerFlowName - - peerflowWithConfigID, err := GetChildWorkflowID(ctx, "peer-flow-with-config", input.PeerFlowName) - if err != nil { - return nil, err - } - - peerflowWithConfigOpts := workflow.ChildWorkflowOptions{ - WorkflowID: peerflowWithConfigID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - } - - limits := &PeerFlowLimits{ - TotalSyncFlows: input.TotalSyncFlows, - TotalNormalizeFlows: input.TotalNormalizeFlows, - MaxBatchSize: input.MaxBatchSize, - } - - state := NewStartedPeerFlowState() - peerflowWithConfigCtx := workflow.WithChildOptions(ctx, peerflowWithConfigOpts) - peerFlowWithConfigFuture := workflow.ExecuteChildWorkflow( - peerflowWithConfigCtx, PeerFlowWorkflowWithConfig, fconn, &limits, state) - - var res PeerFlowWorkflowResult - if err := peerFlowWithConfigFuture.Get(peerflowWithConfigCtx, &res); err != nil { - return nil, fmt.Errorf("failed to execute child workflow: %w", err) - } - - return &res, nil -} +// CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. +type CDCFlowWorkflowResult = CDCFlowState -func PeerFlowWorkflowWithConfig( +func CDCFlowWorkflowWithConfig( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, - limits *PeerFlowLimits, - state *PeerFlowState, -) (*PeerFlowWorkflowResult, error) { + limits *CDCFlowLimits, + state *CDCFlowState, +) (*CDCFlowWorkflowResult, error) { if state == nil { - state = NewStartedPeerFlowState() + state = NewCDCFlowState() } if cfg == nil { return nil, fmt.Errorf("invalid connection configs") } - w := NewPeerFlowWorkflowExecution(ctx) + w := NewCDCFlowWorkflowExecution(ctx) if limits.TotalSyncFlows == 0 { - limits.TotalSyncFlows = maxSyncFlowsPerPeerFlow + limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow } // Support a Query for the current state of the peer flow. - err := workflow.SetQueryHandler(ctx, PeerFlowStatusQuery, func(jobName string) (PeerFlowState, error) { + err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowState, error) { return *state, nil }) if err != nil { - return state, fmt.Errorf("failed to set `%s` query handler: %w", PeerFlowStatusQuery, err) + return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) } - signalChan := workflow.GetSignalChannel(ctx, shared.PeerFlowSignalName) - signalHandler := func(_ workflow.Context, v shared.PeerFlowSignal) { + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + signalHandler := func(_ workflow.Context, v shared.CDCFlowSignal) { w.logger.Info("received signal - ", v) state.ActiveSignal = v } @@ -252,7 +175,7 @@ func PeerFlowWorkflowWithConfig( // Support a signal to pause the peer flow. selector := workflow.NewSelector(ctx) selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) { - var signalVal shared.PeerFlowSignal + var signalVal shared.CDCFlowSignal c.Receive(ctx, &signalVal) signalHandler(ctx, signalVal) }) @@ -414,5 +337,5 @@ func PeerFlowWorkflowWithConfig( } state.TruncateProgress() - return nil, workflow.NewContinueAsNewError(ctx, PeerFlowWorkflowWithConfig, cfg, limits, state) + return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) } diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index c5a47d6d38..cf5d395698 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -30,8 +30,8 @@ import ( // - creating the normalized table on the destination peer type SetupFlowState struct { - PeerFlowName string - Progress []string + CDCFlowName string + Progress []string } type SetupFlowExecution struct { @@ -55,7 +55,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("checking connections for peer flow - ", s.PeerFlowName) + s.logger.Info("checking connections for CDC flow - ", s.CDCFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 2 * time.Minute, @@ -75,7 +75,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( return fmt.Errorf("failed to check destination peer connection: %w", err) } - s.logger.Info("ensuring metadata table exists - ", s.PeerFlowName) + s.logger.Info("ensuring metadata table exists - ", s.CDCFlowName) // then setup the destination peer metadata tables if destConnStatus.NeedsSetupMetadataTables { @@ -95,7 +95,7 @@ func (s *SetupFlowExecution) ensurePullability( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("ensuring pullability for peer flow - ", s.PeerFlowName) + s.logger.Info("ensuring pullability for peer flow - ", s.CDCFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, @@ -108,7 +108,7 @@ func (s *SetupFlowExecution) ensurePullability( // create EnsurePullabilityInput for the srcTableName ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ PeerConnectionConfig: config.Source, - FlowJobName: s.PeerFlowName, + FlowJobName: s.CDCFlowName, SourceTableIdentifiers: srcTblIdentifiers, } @@ -139,7 +139,7 @@ func (s *SetupFlowExecution) createRawTable( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("creating raw table on destination - ", s.PeerFlowName) + s.logger.Info("creating raw table on destination - ", s.CDCFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -147,7 +147,7 @@ func (s *SetupFlowExecution) createRawTable( // attempt to create the tables. createRawTblInput := &protos.CreateRawTableInput{ PeerConnectionConfig: config.Destination, - FlowJobName: s.PeerFlowName, + FlowJobName: s.CDCFlowName, TableNameMapping: config.TableNameMapping, CdcSyncMode: config.CdcSyncMode, } @@ -164,7 +164,7 @@ func (s *SetupFlowExecution) createRawTable( // sets up the normalized tables on the destination peer. func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs) (map[string]*protos.TableSchema, error) { - s.logger.Info("fetching table schema for peer flow - ", s.PeerFlowName) + s.logger.Info("fetching table schema for peer flow - ", s.CDCFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Hour, @@ -191,7 +191,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sortedSourceTables := maps.Keys(tableNameSchemaMapping) sort.Strings(sortedSourceTables) - s.logger.Info("setting up normalized tables for peer flow - ", s.PeerFlowName) + s.logger.Info("setting up normalized tables for peer flow - ", s.CDCFlowName) normalizedTableMapping := make(map[string]*protos.TableSchema) for _, srcTableName := range sortedSourceTables { tableSchema := tableNameSchemaMapping[srcTableName] @@ -213,7 +213,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( return nil, fmt.Errorf("failed to create normalized tables: %w", err) } - s.logger.Info("finished setting up normalized tables for peer flow - ", s.PeerFlowName) + s.logger.Info("finished setting up normalized tables for peer flow - ", s.CDCFlowName) return normalizedTableMapping, nil } @@ -222,7 +222,7 @@ func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (map[string]*protos.TableSchema, error) { - s.logger.Info("executing setup flow - ", s.PeerFlowName) + s.logger.Info("executing setup flow - ", s.CDCFlowName) // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { @@ -252,8 +252,8 @@ func (s *SetupFlowExecution) executeSetupFlow( func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) (*protos.FlowConnectionConfigs, error) { setupFlowState := &SetupFlowState{ - PeerFlowName: config.FlowJobName, - Progress: []string{}, + CDCFlowName: config.FlowJobName, + Progress: []string{}, } // create the setup flow execution diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index a2be62af9e..6661d6ecc4 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -11,8 +11,8 @@ import ( ) type SyncFlowState struct { - PeerFlowName string - Progress []string + CDCFlowName string + Progress []string } type SyncFlowExecution struct { @@ -22,8 +22,8 @@ type SyncFlowExecution struct { } type NormalizeFlowState struct { - PeerFlowName string - Progress []string + CDCFlowName string + Progress []string } type NormalizeFlowExecution struct { @@ -56,7 +56,7 @@ func (s *SyncFlowExecution) executeSyncFlow( opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, ) (*model.SyncResponse, error) { - s.logger.Info("executing sync flow - ", s.PeerFlowName) + s.logger.Info("executing sync flow - ", s.CDCFlowName) syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, @@ -65,7 +65,7 @@ func (s *SyncFlowExecution) executeSyncFlow( // execute GetLastSyncedID on destination peer lastSyncInput := &protos.GetLastSyncedIDInput{ PeerConnectionConfig: config.Destination, - FlowJobName: s.PeerFlowName, + FlowJobName: s.CDCFlowName, } lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) @@ -113,8 +113,8 @@ func SyncFlowWorkflow(ctx workflow.Context, options *protos.SyncFlowOptions, ) (*model.SyncResponse, error) { s := NewSyncFlowExecution(ctx, &SyncFlowState{ - PeerFlowName: config.FlowJobName, - Progress: []string{}, + CDCFlowName: config.FlowJobName, + Progress: []string{}, }) return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping) @@ -125,8 +125,8 @@ func NormalizeFlowWorkflow(ctx workflow.Context, tableSchemaDelta *protos.TableSchemaDelta, ) (*model.NormalizeResponse, error) { s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ - PeerFlowName: config.FlowJobName, - Progress: []string{}, + CDCFlowName: config.FlowJobName, + Progress: []string{}, }) return s.executeNormalizeFlow(ctx, config, tableSchemaDelta) @@ -137,7 +137,7 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( config *protos.FlowConnectionConfigs, tableSchemaDelta *protos.TableSchemaDelta, ) (*model.NormalizeResponse, error) { - s.logger.Info("executing normalize flow - ", s.PeerFlowName) + s.logger.Info("executing normalize flow - ", s.CDCFlowName) normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index 2e3386a5a5..f48a298f6f 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -1,13 +1,13 @@ // @generated #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct CreatePeerFlowRequest { +pub struct CreateCdcFlowRequest { #[prost(message, optional, tag="1")] pub connection_configs: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct CreatePeerFlowResponse { +pub struct CreateCdcFlowResponse { #[prost(string, tag="1")] pub worflow_id: ::prost::alloc::string::String, } diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index 1de74cb8f3..a6e73933f3 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -1,5 +1,5 @@ // @generated -impl serde::Serialize for CreatePeerFlowRequest { +impl serde::Serialize for CreateCdcFlowRequest { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -10,14 +10,14 @@ impl serde::Serialize for CreatePeerFlowRequest { if self.connection_configs.is_some() { len += 1; } - let mut struct_ser = serializer.serialize_struct("peerdb_route.CreatePeerFlowRequest", len)?; + let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateCDCFlowRequest", len)?; if let Some(v) = self.connection_configs.as_ref() { struct_ser.serialize_field("connectionConfigs", v)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for CreatePeerFlowRequest { +impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where @@ -63,13 +63,13 @@ impl<'de> serde::Deserialize<'de> for CreatePeerFlowRequest { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = CreatePeerFlowRequest; + type Value = CreateCdcFlowRequest; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.CreatePeerFlowRequest") + formatter.write_str("struct peerdb_route.CreateCDCFlowRequest") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { @@ -87,15 +87,15 @@ impl<'de> serde::Deserialize<'de> for CreatePeerFlowRequest { } } } - Ok(CreatePeerFlowRequest { + Ok(CreateCdcFlowRequest { connection_configs: connection_configs__, }) } } - deserializer.deserialize_struct("peerdb_route.CreatePeerFlowRequest", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.CreateCDCFlowRequest", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for CreatePeerFlowResponse { +impl serde::Serialize for CreateCdcFlowResponse { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -106,14 +106,14 @@ impl serde::Serialize for CreatePeerFlowResponse { if !self.worflow_id.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("peerdb_route.CreatePeerFlowResponse", len)?; + let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateCDCFlowResponse", len)?; if !self.worflow_id.is_empty() { struct_ser.serialize_field("worflowId", &self.worflow_id)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for CreatePeerFlowResponse { +impl<'de> serde::Deserialize<'de> for CreateCdcFlowResponse { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where @@ -159,13 +159,13 @@ impl<'de> serde::Deserialize<'de> for CreatePeerFlowResponse { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = CreatePeerFlowResponse; + type Value = CreateCdcFlowResponse; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.CreatePeerFlowResponse") + formatter.write_str("struct peerdb_route.CreateCDCFlowResponse") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { @@ -183,12 +183,12 @@ impl<'de> serde::Deserialize<'de> for CreatePeerFlowResponse { } } } - Ok(CreatePeerFlowResponse { + Ok(CreateCdcFlowResponse { worflow_id: worflow_id__.unwrap_or_default(), }) } } - deserializer.deserialize_struct("peerdb_route.CreatePeerFlowResponse", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.CreateCDCFlowResponse", FIELDS, GeneratedVisitor) } } impl serde::Serialize for CreateQRepFlowRequest { diff --git a/nexus/pt/src/peerdb_route.tonic.rs b/nexus/pt/src/peerdb_route.tonic.rs index 571965ea8c..af0614b97d 100644 --- a/nexus/pt/src/peerdb_route.tonic.rs +++ b/nexus/pt/src/peerdb_route.tonic.rs @@ -86,11 +86,11 @@ pub mod flow_service_client { self } /// - pub async fn create_peer_flow( + pub async fn create_cdc_flow( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -104,11 +104,11 @@ pub mod flow_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/peerdb_route.FlowService/CreatePeerFlow", + "/peerdb_route.FlowService/CreateCDCFlow", ); let mut req = request.into_request(); req.extensions_mut() - .insert(GrpcMethod::new("peerdb_route.FlowService", "CreatePeerFlow")); + .insert(GrpcMethod::new("peerdb_route.FlowService", "CreateCDCFlow")); self.inner.unary(req, path, codec).await } /// @@ -173,11 +173,11 @@ pub mod flow_service_server { #[async_trait] pub trait FlowService: Send + Sync + 'static { /// - async fn create_peer_flow( + async fn create_cdc_flow( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// @@ -277,25 +277,25 @@ pub mod flow_service_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/peerdb_route.FlowService/CreatePeerFlow" => { + "/peerdb_route.FlowService/CreateCDCFlow" => { #[allow(non_camel_case_types)] - struct CreatePeerFlowSvc(pub Arc); + struct CreateCDCFlowSvc(pub Arc); impl< T: FlowService, - > tonic::server::UnaryService - for CreatePeerFlowSvc { - type Response = super::CreatePeerFlowResponse; + > tonic::server::UnaryService + for CreateCDCFlowSvc { + type Response = super::CreateCdcFlowResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).create_peer_flow(request).await + (*inner).create_cdc_flow(request).await }; Box::pin(fut) } @@ -307,7 +307,7 @@ pub mod flow_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = CreatePeerFlowSvc(inner); + let method = CreateCDCFlowSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/protos/route.proto b/protos/route.proto index 4665ee64fb..debb2a0390 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -6,11 +6,11 @@ import "flow.proto"; package peerdb_route; -message CreatePeerFlowRequest { +message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; } -message CreatePeerFlowResponse { +message CreateCDCFlowResponse { string worflow_id = 1; } @@ -35,7 +35,7 @@ message ShutdownResponse { } service FlowService { - rpc CreatePeerFlow(CreatePeerFlowRequest) returns (CreatePeerFlowResponse) {} + rpc CreateCDCFlow(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) {} rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) {} rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {} }