diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 0102b9b5e2d..6bfce05119a 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -368,29 +368,33 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa require.NoError(t, err, output) done = true state := "" - streams := gjson.Get(output, "workflows.0.shard_streams.*.streams") - streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream - info := stream.Map() - // We need to wait for all streams to have the desired state. - state = info["state"].String() - if state == wantState { - for i := 0; i < len(fieldEqualityChecks); i++ { - if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { - key := kvparts[0] - val := kvparts[1] - res := info[key].String() - if !strings.EqualFold(res, val) { - done = false + shardStreams := gjson.Get(output, "workflows.0.shard_streams") + // We need to wait for all streams in all shard streams to have the desired state. + shardStreams.ForEach(func(shardStreamId, shardStream gjson.Result) bool { + streams := shardStream.Get("*") + streams.ForEach(func(streamId, stream gjson.Result) bool { + info := stream.Map() + state = info["state"].String() + if state == wantState { + for i := 0; i < len(fieldEqualityChecks); i++ { + if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { + key := kvparts[0] + val := kvparts[1] + res := info[key].String() + if !strings.EqualFold(res, val) { + done = false + } } } - } - if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && - (info["position"].Exists() && info["position"].String() == "") { + if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && + (info["position"].Exists() && info["position"].String() == "") { + done = false + } + } else { done = false } - } else { - done = false - } + return true + }) return true }) if done {