Skip to content

Commit

Permalink
Distributed Transaction Resolver (#16381)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
Signed-off-by: Manan Gupta <[email protected]>
Co-authored-by: Manan Gupta <[email protected]>
  • Loading branch information
harshit-gangal and GuptaManan100 authored Jul 23, 2024
1 parent 2e847cd commit ae7214d
Show file tree
Hide file tree
Showing 43 changed files with 911 additions and 377 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cluster_endtoend_vtgate_transaction.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ jobs:
set -exo pipefail
# run the tests however you normally do, then produce a JUnit XML file
eatmydata -- go run test.go -docker=false -follow -shard vtgate_transaction | tee -a output.txt | go-junit-report -set-exit-code > report.xml
eatmydata -- go run test.go -docker=false -follow -shard vtgate_transaction -build-tag=debug2PC | tee -a output.txt | go-junit-report -set-exit-code > report.xml
- name: Print test output and Record test result in launchable if PR is not a draft
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always()
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ endif
bash ./build.env
go build -trimpath $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \
-ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \
-tags "$(EXTRA_BUILD_TAGS)" \
-o ${VTROOTBIN} ./go/...

# build the vitess binaries statically
Expand All @@ -94,6 +95,7 @@ endif
CGO_ENABLED=0 go build \
-trimpath $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \
-ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \
-tags "$(EXTRA_BUILD_TAGS)" \
-o ${VTROOTBIN} ./go/...
ifndef NOVTADMINBUILD
echo "Building VTAdmin Web, disable VTAdmin build by setting 'NOVTADMINBUILD'"
Expand All @@ -116,6 +118,7 @@ endif
CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build \
-trimpath $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \
-ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \
-tags "$(EXTRA_BUILD_TAGS)" \
-o ${VTROOTBIN}/${GOOS}_${GOARCH} ./go/...

@if [ ! -x "${VTROOTBIN}/${GOOS}_${GOARCH}/vttablet" ]; then \
Expand All @@ -130,6 +133,7 @@ endif
go build -trimpath \
$(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \
-ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \
-tags "$(EXTRA_BUILD_TAGS)" \
-gcflags -'N -l' \
-o ${VTROOTBIN} ./go/...

Expand Down
4 changes: 0 additions & 4 deletions go/cmd/vtgateclienttest/services/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func (c fallbackClient) CloseSession(ctx context.Context, session *vtgatepb.Sess
return c.fallback.CloseSession(ctx, session)
}

func (c fallbackClient) ResolveTransaction(ctx context.Context, dtid string) error {
return c.fallback.ResolveTransaction(ctx, dtid)
}

func (c fallbackClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error {
return c.fallback.VStream(ctx, tabletType, vgtid, filter, flags, send)
}
Expand Down
4 changes: 0 additions & 4 deletions go/cmd/vtgateclienttest/services/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ func (c *terminalClient) CloseSession(ctx context.Context, session *vtgatepb.Ses
return errTerminal
}

func (c *terminalClient) ResolveTransaction(ctx context.Context, dtid string) error {
return errTerminal
}

func (c *terminalClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error {
return errTerminal
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func TestMain(m *testing.M) {
// Set extra args for twopc
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--transaction_mode", "TWOPC",
"--grpc_use_effective_callerid",
)
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "3600",
"--twopc_abandon_age", "1",
"--queryserver-config-transaction-cap", "3",
)

Expand All @@ -89,7 +90,7 @@ func TestMain(m *testing.M) {
VSchema: VSchema,
SidecarDBName: sidecarDBName,
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil {
return 1
}

Expand All @@ -110,22 +111,23 @@ func start(t *testing.T) (*mysql.Conn, func()) {
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)

deleteAll := func() {
tables := []string{"twopc_user"}
for _, table := range tables {
_, _ = utils.ExecAllowError(t, conn, "delete from "+table)
}
}

deleteAll()

return conn, func() {
deleteAll()
conn.Close()
cluster.PanicHandler(t)
cleanup(t)
}
}

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

_, _ = utils.ExecAllowError(t, conn, "delete from twopc_user")
}

type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value

var tables = map[string]extractInterestingValues{
Expand Down Expand Up @@ -171,7 +173,8 @@ func getDTID(dtidMap map[string]string, dtKey string) string {
func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) {
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{Keyspace: keyspaceName, Shard: "-80", Gtid: "current"},
{Keyspace: keyspaceName, Shard: "-40", Gtid: "current"},
{Keyspace: keyspaceName, Shard: "40-80", Gtid: "current"},
{Keyspace: keyspaceName, Shard: "80-", Gtid: "current"},
}}
filter := &binlogdatapb.Filter{
Expand Down Expand Up @@ -211,6 +214,10 @@ func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent,
}

func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string) map[string][]string {
return retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 1*time.Second)
}

func retrieveTransitionsWithTimeout(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string, timeout time.Duration) map[string][]string {
logTable := make(map[string][]string)

keepWaiting := true
Expand All @@ -229,7 +236,7 @@ func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap ma
if re.FieldEvent != nil {
tableMap[re.FieldEvent.TableName] = re.FieldEvent.Fields
}
case <-time.After(1 * time.Second):
case <-time.After(timeout):
keepWaiting = false
}
}
Expand Down
Loading

0 comments on commit ae7214d

Please sign in to comment.