Skip to content

Commit

Permalink
adding visibility into current mirror state
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 14, 2023
1 parent 13e0e0f commit 123e1b4
Show file tree
Hide file tree
Showing 22 changed files with 2,457 additions and 768 deletions.
62 changes: 56 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,30 +350,80 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, nil
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", peerflow.FlowStatusQuery)
if err != nil {
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state *protos.FlowStatus
err = res.Get(&state)
if err != nil {
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}

func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context,
workflowID string, state *protos.FlowStatus) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", peerflow.FlowStatusUpdate, state)
if err != nil {
return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err)
}
return nil
}

func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return nil, err
}
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING.Enum() {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum())
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
workflowID,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED.Enum() {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
workflowID,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState == protos.FlowStatus_STATUS_RUNNING.Enum() || currState == protos.FlowStatus_STATUS_PAUSED.Enum()) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum())
if err != nil {
return nil, err
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v", req.RequestedFlowState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Expand Down
23 changes: 23 additions & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ func (h *FlowRequestHandler) MirrorStatus(
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()),
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
Expand All @@ -36,6 +46,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
CurrentFlowState: *currState,
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
Expand All @@ -50,6 +61,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
CurrentFlowState: *currState,
}, nil
}
}
Expand Down Expand Up @@ -273,3 +285,14 @@ func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJob

return flowNames, nil
}

func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) {
q := "SELECT workflow_id FROM flows WHERE flow_name ILIKE $1"
row := h.pool.QueryRow(ctx, q, flowJobName)
var workflowID string
if err := row.Scan(&workflowID); err != nil {
return "", fmt.Errorf("unable to get workflowID for flow job %s: %w", flowJobName, err)
}

return workflowID, nil
}
28 changes: 14 additions & 14 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 10 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
/*
Executing a transaction which
1. changes both toast column
Expand Down Expand Up @@ -429,7 +429,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
/* transaction updating no rows */
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
Expand Down Expand Up @@ -491,7 +491,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
//complex transaction with random DMLs on a table with toast columns
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
Expand Down Expand Up @@ -565,7 +565,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
//complex transaction with random DMLs on a table with toast columns
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
Expand Down Expand Up @@ -634,7 +634,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
/*
transaction updating a single row
multiple times with changed/unchanged toast columns
Expand Down Expand Up @@ -703,7 +703,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
/* test inserting various types*/
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 2,2,b'1',b'101',
Expand Down Expand Up @@ -784,7 +784,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
/* test inserting various types*/
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 2,2,b'1',b'101',
Expand Down Expand Up @@ -860,7 +860,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC(t *testing.T) {
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
for i := 0; i < 10; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
Expand Down Expand Up @@ -926,7 +926,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
/* inserting across multiple tables*/
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
Expand Down Expand Up @@ -989,7 +989,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) {
// and then insert and mutate schema repeatedly.
go func() {
// insert first row.
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
Expand Down Expand Up @@ -1167,7 +1167,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
rowsTx, err := s.pool.Begin(context.Background())
require.NoError(t, err)

Expand Down Expand Up @@ -1245,7 +1245,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 10 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
// and then insert and mutate schema repeatedly.
go func() {
// insert first row.
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
s.NoError(err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
Expand Down Expand Up @@ -357,7 +357,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
rowsTx, err := s.pool.Begin(context.Background())
s.NoError(err)

Expand Down Expand Up @@ -438,7 +438,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
s.NoError(err)
//insert 20 rows
for i := 1; i <= 20; i++ {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStateQuery(env, connectionGen)
s.NoError(err)
//insert 20 rows
for i := 1; i <= 20; i++ {
Expand Down
Loading

0 comments on commit 123e1b4

Please sign in to comment.