Skip to content

Commit b8bb5c9

Browse files
brendarshanth96
authored andcommitted
Vindexes: Pass context in consistent lookup handleDup (vitessio#14653)
Signed-off-by: Brendan Dougherty <[email protected]>
1 parent 5f2bf56 commit b8bb5c9

File tree

2 files changed

+67
-26
lines changed

2 files changed

+67
-26
lines changed

go/vt/vtgate/vindexes/consistent_lookup.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,7 @@ func (lu *clCommon) handleDup(ctx context.Context, vcursor VCursor, values []sql
360360
return err
361361
}
362362
// Lock the target row using normal transaction priority.
363-
// TODO: context needs to be passed on.
364-
qr, err = vcursor.ExecuteKeyspaceID(context.Background(), lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */)
363+
qr, err = vcursor.ExecuteKeyspaceID(ctx, lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */)
365364
if err != nil {
366365
return err
367366
}

go/vt/vtgate/vindexes/consistent_lookup_test.go

+66-24
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ func TestConsistentLookupMap(t *testing.T) {
7575
lookup := createConsistentLookup(t, "consistent_lookup", false)
7676
vc := &loggingVCursor{}
7777
vc.AddResult(makeTestResultLookup([]int{2, 2}), nil)
78+
ctx := newTestContext()
7879

79-
got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
80+
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
8081
require.NoError(t, err)
8182
want := []key.Destination{
8283
key.DestinationKeyspaceIDs([][]byte{
@@ -94,10 +95,11 @@ func TestConsistentLookupMap(t *testing.T) {
9495
vc.verifyLog(t, []string{
9596
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
9697
})
98+
vc.verifyContext(t, ctx)
9799

98100
// Test query fail.
99101
vc.AddResult(nil, fmt.Errorf("execute failed"))
100-
_, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)})
102+
_, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)})
101103
wantErr := "lookup.Map: execute failed"
102104
if err == nil || err.Error() != wantErr {
103105
t.Errorf("lookup(query fail) err: %v, want %s", err, wantErr)
@@ -126,8 +128,9 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
126128
lookup := createConsistentLookup(t, "consistent_lookup_unique", false)
127129
vc := &loggingVCursor{}
128130
vc.AddResult(makeTestResultLookup([]int{0, 1}), nil)
131+
ctx := newTestContext()
129132

130-
got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
133+
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
131134
require.NoError(t, err)
132135
want := []key.Destination{
133136
key.DestinationNone{},
@@ -139,10 +142,11 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
139142
vc.verifyLog(t, []string{
140143
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
141144
})
145+
vc.verifyContext(t, ctx)
142146

143147
// More than one result is invalid
144148
vc.AddResult(makeTestResultLookup([]int{2}), nil)
145-
_, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)})
149+
_, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)})
146150
wanterr := "Lookup.Map: unexpected multiple results from vindex t: INT64(1)"
147151
if err == nil || err.Error() != wanterr {
148152
t.Errorf("lookup(query fail) err: %v, want %s", err, wanterr)
@@ -171,8 +175,9 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
171175
lookup := createConsistentLookup(t, "consistent_lookup", false)
172176
vc := &loggingVCursor{}
173177
vc.AddResult(makeTestResultLookup([]int{0, 0}), nil)
178+
ctx := newTestContext()
174179

175-
got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
180+
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
176181
require.NoError(t, err)
177182
want := []key.Destination{
178183
key.DestinationNone{},
@@ -184,32 +189,35 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
184189
vc.verifyLog(t, []string{
185190
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
186191
})
192+
vc.verifyContext(t, ctx)
187193
}
188194

189195
func TestConsistentLookupVerify(t *testing.T) {
190196
lookup := createConsistentLookup(t, "consistent_lookup", false)
191197
vc := &loggingVCursor{}
192198
vc.AddResult(makeTestResult(1), nil)
193199
vc.AddResult(makeTestResult(1), nil)
200+
ctx := newTestContext()
194201

195-
_, err := lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")})
202+
_, err := lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")})
196203
require.NoError(t, err)
197204
vc.verifyLog(t, []string{
198205
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
199206
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
200207
})
208+
vc.verifyContext(t, ctx)
201209

202210
// Test query fail.
203211
vc.AddResult(nil, fmt.Errorf("execute failed"))
204-
_, err = lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")})
212+
_, err = lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")})
205213
want := "lookup.Verify: execute failed"
206214
if err == nil || err.Error() != want {
207215
t.Errorf("lookup(query fail) err: %v, want %s", err, want)
208216
}
209217

210218
// Test write_only.
211219
lookup = createConsistentLookup(t, "consistent_lookup", true)
212-
got, err := lookup.Verify(context.Background(), nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
220+
got, err := lookup.Verify(ctx, nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
213221
require.NoError(t, err)
214222
wantBools := []bool{true, true}
215223
if !reflect.DeepEqual(got, wantBools) {
@@ -221,8 +229,9 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
221229
lookup := createConsistentLookup(t, "consistent_lookup", false)
222230
vc := &loggingVCursor{}
223231
vc.AddResult(&sqltypes.Result{}, nil)
232+
ctx := newTestContext()
224233

225-
if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
234+
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
226235
sqltypes.NewInt64(1),
227236
sqltypes.NewInt64(2),
228237
}, {
@@ -234,6 +243,7 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
234243
vc.verifyLog(t, []string{
235244
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0), (:fromc1_1, :fromc2_1, :toc_1) [{fromc1_0 1} {fromc1_1 3} {fromc2_0 2} {fromc2_1 4} {toc_0 test1} {toc_1 test2}] true",
236245
})
246+
vc.verifyContext(t, ctx)
237247
}
238248

239249
func TestConsistentLookupCreateThenRecreate(t *testing.T) {
@@ -242,8 +252,9 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
242252
vc.AddResult(nil, mysql.NewSQLError(mysql.ERDupEntry, mysql.SSConstraintViolation, "Duplicate entry"))
243253
vc.AddResult(&sqltypes.Result{}, nil)
244254
vc.AddResult(&sqltypes.Result{}, nil)
255+
ctx := newTestContext()
245256

246-
if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
257+
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
247258
sqltypes.NewInt64(1),
248259
sqltypes.NewInt64(2),
249260
}}, [][]byte{[]byte("test1")}, false); err != nil {
@@ -254,6 +265,7 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
254265
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
255266
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1, :fromc2, :toc) [{fromc1 1} {fromc2 2} {toc test1}] true",
256267
})
268+
vc.verifyContext(t, ctx)
257269
}
258270

259271
func TestConsistentLookupCreateThenUpdate(t *testing.T) {
@@ -263,8 +275,9 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
263275
vc.AddResult(makeTestResult(1), nil)
264276
vc.AddResult(&sqltypes.Result{}, nil)
265277
vc.AddResult(&sqltypes.Result{}, nil)
278+
ctx := newTestContext()
266279

267-
if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
280+
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
268281
sqltypes.NewInt64(1),
269282
sqltypes.NewInt64(2),
270283
}}, [][]byte{[]byte("test1")}, false); err != nil {
@@ -276,6 +289,7 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
276289
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
277290
"ExecutePre update t set toc=:toc where fromc1 = :fromc1 and fromc2 = :fromc2 [{fromc1 1} {fromc2 2} {toc test1}] true",
278291
})
292+
vc.verifyContext(t, ctx)
279293
}
280294

281295
func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
@@ -285,8 +299,9 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
285299
vc.AddResult(makeTestResult(1), nil)
286300
vc.AddResult(&sqltypes.Result{}, nil)
287301
vc.AddResult(&sqltypes.Result{}, nil)
302+
ctx := newTestContext()
288303

289-
if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
304+
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
290305
sqltypes.NewInt64(1),
291306
sqltypes.NewInt64(2),
292307
}}, [][]byte{[]byte("1")}, false); err != nil {
@@ -297,6 +312,7 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
297312
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc 1}] false",
298313
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc 1}] false",
299314
})
315+
vc.verifyContext(t, ctx)
300316
}
301317

302318
func TestConsistentLookupCreateThenDupkey(t *testing.T) {
@@ -306,8 +322,9 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
306322
vc.AddResult(makeTestResult(1), nil)
307323
vc.AddResult(makeTestResult(1), nil)
308324
vc.AddResult(&sqltypes.Result{}, nil)
325+
ctx := newTestContext()
309326

310-
err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
327+
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
311328
sqltypes.NewInt64(1),
312329
sqltypes.NewInt64(2),
313330
}}, [][]byte{[]byte("test1")}, false)
@@ -318,14 +335,16 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
318335
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
319336
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
320337
})
338+
vc.verifyContext(t, ctx)
321339
}
322340

323341
func TestConsistentLookupCreateNonDupError(t *testing.T) {
324342
lookup := createConsistentLookup(t, "consistent_lookup", false)
325343
vc := &loggingVCursor{}
326344
vc.AddResult(nil, errors.New("general error"))
345+
ctx := newTestContext()
327346

328-
err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
347+
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
329348
sqltypes.NewInt64(1),
330349
sqltypes.NewInt64(2),
331350
}}, [][]byte{[]byte("test1")}, false)
@@ -336,15 +355,17 @@ func TestConsistentLookupCreateNonDupError(t *testing.T) {
336355
vc.verifyLog(t, []string{
337356
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true",
338357
})
358+
vc.verifyContext(t, ctx)
339359
}
340360

341361
func TestConsistentLookupCreateThenBadRows(t *testing.T) {
342362
lookup := createConsistentLookup(t, "consistent_lookup", false)
343363
vc := &loggingVCursor{}
344364
vc.AddResult(nil, vterrors.New(vtrpcpb.Code_ALREADY_EXISTS, "(errno 1062) (sqlstate 23000) Duplicate entry"))
345365
vc.AddResult(makeTestResult(2), nil)
366+
ctx := newTestContext()
346367

347-
err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
368+
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
348369
sqltypes.NewInt64(1),
349370
sqltypes.NewInt64(2),
350371
}}, [][]byte{[]byte("test1")}, false)
@@ -356,14 +377,16 @@ func TestConsistentLookupCreateThenBadRows(t *testing.T) {
356377
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true",
357378
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
358379
})
380+
vc.verifyContext(t, ctx)
359381
}
360382

361383
func TestConsistentLookupDelete(t *testing.T) {
362384
lookup := createConsistentLookup(t, "consistent_lookup", false)
363385
vc := &loggingVCursor{}
364386
vc.AddResult(&sqltypes.Result{}, nil)
387+
ctx := newTestContext()
365388

366-
if err := lookup.(Lookup).Delete(context.Background(), vc, [][]sqltypes.Value{{
389+
if err := lookup.(Lookup).Delete(ctx, vc, [][]sqltypes.Value{{
367390
sqltypes.NewInt64(1),
368391
sqltypes.NewInt64(2),
369392
}}, []byte("test")); err != nil {
@@ -372,15 +395,17 @@ func TestConsistentLookupDelete(t *testing.T) {
372395
vc.verifyLog(t, []string{
373396
"ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true",
374397
})
398+
vc.verifyContext(t, ctx)
375399
}
376400

377401
func TestConsistentLookupUpdate(t *testing.T) {
378402
lookup := createConsistentLookup(t, "consistent_lookup", false)
379403
vc := &loggingVCursor{}
380404
vc.AddResult(&sqltypes.Result{}, nil)
381405
vc.AddResult(&sqltypes.Result{}, nil)
406+
ctx := newTestContext()
382407

383-
if err := lookup.(Lookup).Update(context.Background(), vc, []sqltypes.Value{
408+
if err := lookup.(Lookup).Update(ctx, vc, []sqltypes.Value{
384409
sqltypes.NewInt64(1),
385410
sqltypes.NewInt64(2),
386411
}, []byte("test"), []sqltypes.Value{
@@ -393,6 +418,7 @@ func TestConsistentLookupUpdate(t *testing.T) {
393418
"ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true",
394419
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 3} {fromc2_0 4} {toc_0 test}] true",
395420
})
421+
vc.verifyContext(t, ctx)
396422
}
397423

398424
func TestConsistentLookupNoUpdate(t *testing.T) {
@@ -469,13 +495,19 @@ func createConsistentLookup(t *testing.T, name string, writeOnly bool) SingleCol
469495
return l.(SingleColumn)
470496
}
471497

498+
func newTestContext() context.Context {
499+
type testContextKey string // keep static checks from complaining about built-in types as context keys
500+
return context.WithValue(context.Background(), (testContextKey)("test"), "foo")
501+
}
502+
472503
var _ VCursor = (*loggingVCursor)(nil)
473504

474505
type loggingVCursor struct {
475-
results []*sqltypes.Result
476-
errors []error
477-
index int
478-
log []string
506+
results []*sqltypes.Result
507+
errors []error
508+
index int
509+
log []string
510+
contexts []context.Context
479511
}
480512

481513
func (vc *loggingVCursor) LookupRowLockShardSession() vtgatepb.CommitOrder {
@@ -508,14 +540,14 @@ func (vc *loggingVCursor) Execute(ctx context.Context, method string, query stri
508540
case vtgatepb.CommitOrder_AUTOCOMMIT:
509541
name = "ExecuteAutocommit"
510542
}
511-
return vc.execute(name, query, bindvars, rollbackOnError)
543+
return vc.execute(ctx, name, query, bindvars, rollbackOnError)
512544
}
513545

514546
func (vc *loggingVCursor) ExecuteKeyspaceID(ctx context.Context, keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
515-
return vc.execute("ExecuteKeyspaceID", query, bindVars, rollbackOnError)
547+
return vc.execute(ctx, "ExecuteKeyspaceID", query, bindVars, rollbackOnError)
516548
}
517549

518-
func (vc *loggingVCursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) {
550+
func (vc *loggingVCursor) execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) {
519551
if vc.index >= len(vc.results) {
520552
return nil, fmt.Errorf("ran out of results to return: %s", query)
521553
}
@@ -525,6 +557,7 @@ func (vc *loggingVCursor) execute(method string, query string, bindvars map[stri
525557
}
526558
sort.Slice(bvl, func(i, j int) bool { return bvl[i].Name < bvl[j].Name })
527559
vc.log = append(vc.log, fmt.Sprintf("%s %s %v %v", method, query, bvl, rollbackOnError))
560+
vc.contexts = append(vc.contexts, ctx)
528561
idx := vc.index
529562
vc.index++
530563
if vc.errors[idx] != nil {
@@ -548,6 +581,15 @@ func (vc *loggingVCursor) verifyLog(t *testing.T, want []string) {
548581
}
549582
}
550583

584+
func (vc *loggingVCursor) verifyContext(t *testing.T, want context.Context) {
585+
t.Helper()
586+
for i, got := range vc.contexts {
587+
if got != want {
588+
t.Errorf("context(%d):\ngot: %v\nwant: %v", i, got, want)
589+
}
590+
}
591+
}
592+
551593
// create lookup result with one to one mapping
552594
func makeTestResult(numRows int) *sqltypes.Result {
553595
result := &sqltypes.Result{

0 commit comments

Comments
 (0)