From 6f23f60cc32ffbd066dea80182d2d96de1b6cd02 Mon Sep 17 00:00:00 2001 From: Brendan Dougherty Date: Mon, 8 Jan 2024 18:22:08 +0000 Subject: [PATCH] Vindexes: Pass context in consistent lookup handleDup (#14653) Signed-off-by: Brendan Dougherty --- go/vt/vtgate/vindexes/consistent_lookup.go | 3 +- .../vtgate/vindexes/consistent_lookup_test.go | 90 ++++++++++++++----- 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index cc74966c197..d51d9d2e386 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -384,8 +384,7 @@ func (lu *clCommon) handleDup(ctx context.Context, vcursor VCursor, values []sql return err } // Lock the target row using normal transaction priority. - // TODO: context needs to be passed on. - qr, err = vcursor.ExecuteKeyspaceID(context.Background(), lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */) + qr, err = vcursor.ExecuteKeyspaceID(ctx, lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */) if err != nil { return err } diff --git a/go/vt/vtgate/vindexes/consistent_lookup_test.go b/go/vt/vtgate/vindexes/consistent_lookup_test.go index 832a16fae9f..a6f1046e201 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup_test.go +++ b/go/vt/vtgate/vindexes/consistent_lookup_test.go @@ -116,8 +116,9 @@ func TestConsistentLookupMap(t *testing.T) { lookup := createConsistentLookup(t, "consistent_lookup", false) vc := &loggingVCursor{} vc.AddResult(makeTestResultLookup([]int{2, 2}), nil) + ctx := newTestContext() - got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) require.NoError(t, err) want := []key.Destination{ key.DestinationKeyspaceIDs([][]byte{ @@ -135,10 +136,11 @@ func TestConsistentLookupMap(t *testing.T) { vc.verifyLog(t, []string{ "ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", }) + vc.verifyContext(t, ctx) // Test query fail. vc.AddResult(nil, fmt.Errorf("execute failed")) - _, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)}) + _, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)}) wantErr := "lookup.Map: execute failed" if err == nil || err.Error() != wantErr { t.Errorf("lookup(query fail) err: %v, want %s", err, wantErr) @@ -167,8 +169,9 @@ func TestConsistentLookupUniqueMap(t *testing.T) { lookup := createConsistentLookup(t, "consistent_lookup_unique", false) vc := &loggingVCursor{} vc.AddResult(makeTestResultLookup([]int{0, 1}), nil) + ctx := newTestContext() - got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) require.NoError(t, err) want := []key.Destination{ key.DestinationNone{}, @@ -180,10 +183,11 @@ func TestConsistentLookupUniqueMap(t *testing.T) { vc.verifyLog(t, []string{ "ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", }) + vc.verifyContext(t, ctx) // More than one result is invalid vc.AddResult(makeTestResultLookup([]int{2}), nil) - _, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)}) + _, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)}) wanterr := "Lookup.Map: unexpected multiple results from vindex t: INT64(1)" if err == nil || err.Error() != wanterr { t.Errorf("lookup(query fail) err: %v, want %s", err, wanterr) @@ -212,8 +216,9 @@ func TestConsistentLookupMapAbsent(t *testing.T) { lookup := createConsistentLookup(t, "consistent_lookup", false) vc := &loggingVCursor{} vc.AddResult(makeTestResultLookup([]int{0, 0}), nil) + ctx := newTestContext() - got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) + got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}) require.NoError(t, err) want := []key.Destination{ key.DestinationNone{}, @@ -225,6 +230,7 @@ func TestConsistentLookupMapAbsent(t *testing.T) { vc.verifyLog(t, []string{ "ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupVerify(t *testing.T) { @@ -232,17 +238,19 @@ func TestConsistentLookupVerify(t *testing.T) { vc := &loggingVCursor{} vc.AddResult(makeTestResult(1), nil) vc.AddResult(makeTestResult(1), nil) + ctx := newTestContext() - _, err := lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")}) + _, err := lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")}) require.NoError(t, err) vc.verifyLog(t, []string{ "ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false", "ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false", }) + vc.verifyContext(t, ctx) // Test query fail. vc.AddResult(nil, fmt.Errorf("execute failed")) - _, err = lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}) + _, err = lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}) want := "lookup.Verify: execute failed" if err == nil || err.Error() != want { t.Errorf("lookup(query fail) err: %v, want %s", err, want) @@ -250,7 +258,7 @@ func TestConsistentLookupVerify(t *testing.T) { // Test write_only. lookup = createConsistentLookup(t, "consistent_lookup", true) - got, err := lookup.Verify(context.Background(), nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")}) + got, err := lookup.Verify(ctx, nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")}) require.NoError(t, err) wantBools := []bool{true, true} if !reflect.DeepEqual(got, wantBools) { @@ -262,8 +270,9 @@ func TestConsistentLookupCreateSimple(t *testing.T) { lookup := createConsistentLookup(t, "consistent_lookup", false) vc := &loggingVCursor{} vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }, { @@ -275,6 +284,7 @@ func TestConsistentLookupCreateSimple(t *testing.T) { vc.verifyLog(t, []string{ "ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0), (:fromc1_1, :fromc2_1, :toc_1) [{fromc1_0 1} {fromc1_1 3} {fromc2_0 2} {fromc2_1 4} {toc_0 test1} {toc_1 test2}] true", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupCreateThenRecreate(t *testing.T) { @@ -283,8 +293,9 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) { vc.AddResult(nil, sqlerror.NewSQLError(sqlerror.ERDupEntry, sqlerror.SSConstraintViolation, "Duplicate entry")) vc.AddResult(&sqltypes.Result{}, nil) vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, [][]byte{[]byte("test1")}, false); err != nil { @@ -295,6 +306,7 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) { "ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false", "ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1, :fromc2, :toc) [{fromc1 1} {fromc2 2} {toc test1}] true", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupCreateThenUpdate(t *testing.T) { @@ -304,8 +316,9 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) { vc.AddResult(makeTestResult(1), nil) vc.AddResult(&sqltypes.Result{}, nil) vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, [][]byte{[]byte("test1")}, false); err != nil { @@ -317,6 +330,7 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) { "ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false", "ExecutePre update t set toc=:toc where fromc1 = :fromc1 and fromc2 = :fromc2 [{fromc1 1} {fromc2 2} {toc test1}] true", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) { @@ -326,8 +340,9 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) { vc.AddResult(makeTestResult(1), nil) vc.AddResult(&sqltypes.Result{}, nil) vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, [][]byte{[]byte("1")}, false); err != nil { @@ -338,6 +353,7 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) { "ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc 1}] false", "ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc 1}] false", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupCreateThenDupkey(t *testing.T) { @@ -347,8 +363,9 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) { vc.AddResult(makeTestResult(1), nil) vc.AddResult(makeTestResult(1), nil) vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, [][]byte{[]byte("test1")}, false) @@ -359,14 +376,16 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) { "ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false", "ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupCreateNonDupError(t *testing.T) { lookup := createConsistentLookup(t, "consistent_lookup", false) vc := &loggingVCursor{} vc.AddResult(nil, errors.New("general error")) + ctx := newTestContext() - err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, [][]byte{[]byte("test1")}, false) @@ -377,6 +396,7 @@ func TestConsistentLookupCreateNonDupError(t *testing.T) { vc.verifyLog(t, []string{ "ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupCreateThenBadRows(t *testing.T) { @@ -384,8 +404,9 @@ func TestConsistentLookupCreateThenBadRows(t *testing.T) { vc := &loggingVCursor{} vc.AddResult(nil, vterrors.New(vtrpcpb.Code_ALREADY_EXISTS, "(errno 1062) (sqlstate 23000) Duplicate entry")) vc.AddResult(makeTestResult(2), nil) + ctx := newTestContext() - err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{ + err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, [][]byte{[]byte("test1")}, false) @@ -397,14 +418,16 @@ func TestConsistentLookupCreateThenBadRows(t *testing.T) { "ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true", "ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupDelete(t *testing.T) { lookup := createConsistentLookup(t, "consistent_lookup", false) vc := &loggingVCursor{} vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - if err := lookup.(Lookup).Delete(context.Background(), vc, [][]sqltypes.Value{{ + if err := lookup.(Lookup).Delete(ctx, vc, [][]sqltypes.Value{{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }}, []byte("test")); err != nil { @@ -413,6 +436,7 @@ func TestConsistentLookupDelete(t *testing.T) { vc.verifyLog(t, []string{ "ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupUpdate(t *testing.T) { @@ -420,8 +444,9 @@ func TestConsistentLookupUpdate(t *testing.T) { vc := &loggingVCursor{} vc.AddResult(&sqltypes.Result{}, nil) vc.AddResult(&sqltypes.Result{}, nil) + ctx := newTestContext() - if err := lookup.(Lookup).Update(context.Background(), vc, []sqltypes.Value{ + if err := lookup.(Lookup).Update(ctx, vc, []sqltypes.Value{ sqltypes.NewInt64(1), sqltypes.NewInt64(2), }, []byte("test"), []sqltypes.Value{ @@ -434,6 +459,7 @@ func TestConsistentLookupUpdate(t *testing.T) { "ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true", "ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 3} {fromc2_0 4} {toc_0 test}] true", }) + vc.verifyContext(t, ctx) } func TestConsistentLookupNoUpdate(t *testing.T) { @@ -510,13 +536,19 @@ func createConsistentLookup(t *testing.T, name string, writeOnly bool) SingleCol return l.(SingleColumn) } +func newTestContext() context.Context { + type testContextKey string // keep static checks from complaining about built-in types as context keys + return context.WithValue(context.Background(), (testContextKey)("test"), "foo") +} + var _ VCursor = (*loggingVCursor)(nil) type loggingVCursor struct { - results []*sqltypes.Result - errors []error - index int - log []string + results []*sqltypes.Result + errors []error + index int + log []string + contexts []context.Context } func (vc *loggingVCursor) LookupRowLockShardSession() vtgatepb.CommitOrder { @@ -557,14 +589,14 @@ func (vc *loggingVCursor) Execute(ctx context.Context, method string, query stri case vtgatepb.CommitOrder_AUTOCOMMIT: name = "ExecuteAutocommit" } - return vc.execute(name, query, bindvars, rollbackOnError) + return vc.execute(ctx, name, query, bindvars, rollbackOnError) } func (vc *loggingVCursor) ExecuteKeyspaceID(ctx context.Context, keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { - return vc.execute("ExecuteKeyspaceID", query, bindVars, rollbackOnError) + return vc.execute(ctx, "ExecuteKeyspaceID", query, bindVars, rollbackOnError) } -func (vc *loggingVCursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) { +func (vc *loggingVCursor) execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) { if vc.index >= len(vc.results) { return nil, fmt.Errorf("ran out of results to return: %s", query) } @@ -574,6 +606,7 @@ func (vc *loggingVCursor) execute(method string, query string, bindvars map[stri } sort.Slice(bvl, func(i, j int) bool { return bvl[i].Name < bvl[j].Name }) vc.log = append(vc.log, fmt.Sprintf("%s %s %v %v", method, query, bvl, rollbackOnError)) + vc.contexts = append(vc.contexts, ctx) idx := vc.index vc.index++ if vc.errors[idx] != nil { @@ -597,6 +630,15 @@ func (vc *loggingVCursor) verifyLog(t *testing.T, want []string) { } } +func (vc *loggingVCursor) verifyContext(t *testing.T, want context.Context) { + t.Helper() + for i, got := range vc.contexts { + if got != want { + t.Errorf("context(%d):\ngot: %v\nwant: %v", i, got, want) + } + } +} + // create lookup result with one to one mapping func makeTestResult(numRows int) *sqltypes.Result { result := &sqltypes.Result{