From d908287bd84cfee0db3c15e55832fe68d0da5d68 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 10 Dec 2024 12:15:30 +0200 Subject: [PATCH] Remove binlog server based PITR logic and tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/recovery/pitr/binlog_server.go | 137 ---- .../recovery/pitr/shardedpitr_test.go | 603 ------------------ go/vt/vttablet/tabletmanager/restore.go | 233 ------- test/config.json | 11 - 4 files changed, 984 deletions(-) delete mode 100644 go/test/endtoend/recovery/pitr/binlog_server.go delete mode 100644 go/test/endtoend/recovery/pitr/shardedpitr_test.go diff --git a/go/test/endtoend/recovery/pitr/binlog_server.go b/go/test/endtoend/recovery/pitr/binlog_server.go deleted file mode 100644 index 3b78b0d4ad7..00000000000 --- a/go/test/endtoend/recovery/pitr/binlog_server.go +++ /dev/null @@ -1,137 +0,0 @@ -/* -Copyright 2020 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pitr - -import ( - "fmt" - "os" - "os/exec" - "path" - "strings" - "syscall" - "time" - - "vitess.io/vitess/go/vt/log" -) - -const ( - binlogExecutableName = "rippled" - binlogDataDir = "binlog_dir" - binlogUser = "ripple" - binlogPassword = "ripplepassword" - binlogPasswordHash = "D4CDF66E273494CEA9592162BEBB6D62D94C4168" -) - -type binLogServer struct { - hostname string - port int - username string - password string - passwordHash string - dataDirectory string - executablePath string - - proc *exec.Cmd - exit chan error -} - -type mysqlSource struct { - hostname string - port int - username string - password string -} - -// newBinlogServer returns an instance of binlog server -func newBinlogServer(hostname string, port int) (*binLogServer, error) { - dataDir := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", binlogDataDir, port)) - fmt.Println(dataDir) - if _, err := os.Stat(dataDir); os.IsNotExist(err) { - err := os.Mkdir(dataDir, 0700) - if err != nil { - log.Error(err) - return nil, err - } - } - return &binLogServer{ - executablePath: path.Join(os.Getenv("EXTRA_BIN"), binlogExecutableName), - dataDirectory: dataDir, - username: binlogUser, - password: binlogPassword, - passwordHash: binlogPasswordHash, - hostname: hostname, - port: port, - }, nil -} - -// start starts the binlog server points to running mysql port -func (bs *binLogServer) start(source mysqlSource) error { - bs.proc = exec.Command( - bs.executablePath, - fmt.Sprintf("-ripple_datadir=%s", bs.dataDirectory), - fmt.Sprintf("-ripple_server_password_hash=%s", bs.passwordHash), - fmt.Sprintf("-ripple_master_address=%s", source.hostname), - fmt.Sprintf("-ripple_master_port=%d", source.port), - fmt.Sprintf("-ripple_master_user=%s", source.username), - fmt.Sprintf("-ripple_server_ports=%d", bs.port), - ) - if source.password != "" { - bs.proc.Args = append(bs.proc.Args, fmt.Sprintf("-ripple_master_password=%s", source.password)) - } - - errFile, err := os.Create(path.Join(bs.dataDirectory, "log.txt")) - if err != nil { - log.Errorf("cannot create error log file for binlog server: %v", err) - return err - } - bs.proc.Stderr = errFile - - bs.proc.Env = append(bs.proc.Env, os.Environ()...) - - log.Infof("Running binlog server with command: %v", strings.Join(bs.proc.Args, " ")) - - err = bs.proc.Start() - if err != nil { - return err - } - bs.exit = make(chan error) - go func() { - if bs.proc != nil { - bs.exit <- bs.proc.Wait() - } - }() - return nil -} - -func (bs *binLogServer) stop() error { - if bs.proc == nil || bs.exit == nil { - return nil - } - // Attempt graceful shutdown with SIGTERM first - bs.proc.Process.Signal(syscall.SIGTERM) - - select { - case err := <-bs.exit: - bs.proc = nil - return err - - case <-time.After(10 * time.Second): - bs.proc.Process.Kill() - bs.proc = nil - return <-bs.exit - } -} diff --git a/go/test/endtoend/recovery/pitr/shardedpitr_test.go b/go/test/endtoend/recovery/pitr/shardedpitr_test.go deleted file mode 100644 index f2a76662918..00000000000 --- a/go/test/endtoend/recovery/pitr/shardedpitr_test.go +++ /dev/null @@ -1,603 +0,0 @@ -/* -Copyright 2020 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pitr - -import ( - "context" - "fmt" - "os" - "os/exec" - "path" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/json2" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/log" - - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" -) - -var ( - createTable = `create table product (id bigint(20) primary key, name char(10), created bigint(20));` - insertTable = `insert into product (id, name, created) values(%d, '%s', unix_timestamp());` - getCountID = `select count(*) from product` -) - -var ( - clusterInstance *cluster.LocalProcessCluster - - primary *cluster.Vttablet - replica1 *cluster.Vttablet - replica2 *cluster.Vttablet - shard0Primary *cluster.Vttablet - shard0Replica1 *cluster.Vttablet - shard0Replica2 *cluster.Vttablet - shard1Primary *cluster.Vttablet - shard1Replica1 *cluster.Vttablet - shard1Replica2 *cluster.Vttablet - - cell = "zone1" - hostname = "localhost" - binlogHost = "127.0.0.1" - keyspaceName = "ks" - restoreKS1Name = "restoreks1" - restoreKS2Name = "restoreks2" - restoreKS3Name = "restoreks3" - shardName = "0" - shard0Name = "-80" - shard1Name = "80-" - dbName = "vt_ks" - mysqlUserName = "vt_dba" - mysqlPassword = "VtDbaPass" - dbCredentialFile = "" - initDBFileWithPassword = "" - vSchema = `{ - "sharded": true, - "vindexes": { - "hash_index": { - "type": "hash" - } - }, - "tables": { - "product": { - "column_vindexes": [ - { - "column": "id", - "name": "hash_index" - } - ] - } - } - }` - commonTabletArg = []string{ - "--vreplication_retry_delay", "1s", - "--degraded_threshold", "5s", - "--lock_tables_timeout", "5s", - "--watch_replication_stream", - "--serving_state_grace_period", "1s"} - - defaultTimeout = 30 * time.Second - defaultTick = 1 * time.Second -) - -// Test pitr (Point in time recovery). -// ------------------------------------------- -// The following test will: -// - create a shard with primary and replica -// - run InitShardPrimary -// - point binlog server to primary -// - insert some data using vtgate (e.g. here we have inserted rows 1,2) -// - verify the replication -// - take backup of replica -// - insert some data using vtgate (e.g. we inserted rows 3 4 5 6), while inserting row-4, note down the time (restoreTime1) -// - perform a resharding to create 2 shards (-80, 80-), and delete the old shard -// - point binlog server to primary of both shards -// - insert some data using vtgate (e.g. we will insert 7 8 9 10) and verify we get required number of rows in -80, 80- shard -// - take backup of both shards -// - insert some more data using vtgate (e.g. we will insert 11 12 13 14 15), while inserting row-13, note down the time (restoreTime2) -// - note down the current time (restoreTime3) - -// - Till now we did all the presetup for assertions - -// - asserting that restoring to restoreTime1 (going from 2 shards to 1 shard) is working, i.e. we should get 4 rows. -// - asserting that while restoring if we give small timeout value, it will restore upto to the last available backup (asserting only -80 shard) -// - asserting that restoring to restoreTime2 (going from 2 shards to 2 shards with past time) is working, it will assert for both shards -// - asserting that restoring to restoreTime3 is working, we should get complete data after restoring, as we have in existing shards. -func TestPITRRecovery(t *testing.T) { - defer cluster.PanicHandler(nil) - initializeCluster(t) - defer clusterInstance.Teardown() - - // start the binlog server and point it to primary - bs := startBinlogServer(t, primary) - defer bs.stop() - - // Creating the table - _, err := primary.VttabletProcess.QueryTablet(createTable, keyspaceName, true) - require.NoError(t, err) - - insertRow(t, 1, "prd-1", false) - insertRow(t, 2, "prd-2", false) - - cluster.VerifyRowsInTabletForTable(t, replica1, keyspaceName, 2, "product") - - // backup the replica - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.NoError(t, err) - - // check that the backup shows up in the listing - output, err := clusterInstance.ListBackups("ks/0") - require.NoError(t, err) - assert.Equal(t, 1, len(output)) - - // now insert some more data to simulate the changes after regular backup - // every insert has some time lag/difference to simulate the time gap between rows - // and when we recover to certain time, this time gap will be able to identify the exact eligible row - var restoreTime1 string - for counter := 3; counter <= 6; counter++ { - if counter == 4 { // we want to recovery till this, so noting the time - tm := time.Now().Add(1 * time.Second).UTC() - restoreTime1 = tm.Format(time.RFC3339) - } - insertRow(t, counter, fmt.Sprintf("prd-%d", counter), true) - } - - // starting resharding process - performResharding(t) - - // start the binlog server and point it to shard0Primary - bs0 := startBinlogServer(t, shard0Primary) - defer bs0.stop() - - // start the binlog server and point it to shard1Primary - bs1 := startBinlogServer(t, shard1Primary) - defer bs1.stop() - - for counter := 7; counter <= 10; counter++ { - insertRow(t, counter, fmt.Sprintf("prd-%d", counter), false) - } - - // wait till all the shards have required data - cluster.VerifyRowsInTabletForTable(t, shard0Replica1, keyspaceName, 6, "product") - cluster.VerifyRowsInTabletForTable(t, shard1Replica1, keyspaceName, 4, "product") - - // take the backup (to simulate the regular backup) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Backup", shard0Replica1.Alias) - require.NoError(t, err) - // take the backup (to simulate the regular backup) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Backup", shard1Replica1.Alias) - require.NoError(t, err) - - backups, err := clusterInstance.ListBackups(keyspaceName + "/-80") - require.NoError(t, err) - require.Equal(t, len(backups), 1) - - backups, err = clusterInstance.ListBackups(keyspaceName + "/80-") - require.NoError(t, err) - require.Equal(t, len(backups), 1) - - // now insert some more data to simulate the changes after regular backup - // every insert has some time lag/difference to simulate the time gap between rows - // and when we recover to certain time, this time gap will be able to identify the exact eligible row - var restoreTime2 string - for counter := 11; counter <= 15; counter++ { - if counter == 13 { // we want to recovery till this, so noting the time - tm := time.Now().Add(1 * time.Second).UTC() - restoreTime2 = tm.Format(time.RFC3339) - } - insertRow(t, counter, fmt.Sprintf("prd-%d", counter), true) - } - restoreTime3 := time.Now().UTC().Format(time.RFC3339) - - // creating restore keyspace with snapshot time as restoreTime1 - createRestoreKeyspace(t, restoreTime1, restoreKS1Name) - - // Launching a recovery tablet which recovers data from the primary till the restoreTime1 - testTabletRecovery(t, bs, "2m", restoreKS1Name, "0", "INT64(4)") - - // create restoreKeyspace with snapshot time as restoreTime2 - createRestoreKeyspace(t, restoreTime2, restoreKS2Name) - - // test the recovery with smaller binlog_lookup_timeout for shard0 - // since we have small lookup timeout, it will just get whatever available in the backup - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 1 | prd-1 | 1597219030 | - // | 2 | prd-2 | 1597219030 | - // | 3 | prd-3 | 1597219043 | - // | 5 | prd-5 | 1597219045 | - // | 9 | prd-9 | 1597219130 | - // | 10 | prd-10 | 1597219130 | - // +----+--------+------------+ - testTabletRecovery(t, bs0, "1ms", restoreKS2Name, "-80", "INT64(6)") - - // test the recovery with valid binlog_lookup_timeout for shard0 and getting the data till the restoreTime2 - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 1 | prd-1 | 1597219030 | - // | 2 | prd-2 | 1597219030 | - // | 3 | prd-3 | 1597219043 | - // | 5 | prd-5 | 1597219045 | - // | 9 | prd-9 | 1597219130 | - // | 10 | prd-10 | 1597219130 | - // | 13 | prd-13 | 1597219141 | - // +----+--------+------------+ - testTabletRecovery(t, bs0, "2m", restoreKS2Name, "-80", "INT64(7)") - - // test the recovery with valid binlog_lookup_timeout for shard1 and getting the data till the restoreTime2 - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 4 | prd-4 | 1597219044 | - // | 6 | prd-6 | 1597219046 | - // | 7 | prd-7 | 1597219130 | - // | 8 | prd-8 | 1597219130 | - // | 11 | prd-11 | 1597219139 | - // | 12 | prd-12 | 1597219140 | - // +----+--------+------------+ - testTabletRecovery(t, bs1, "2m", restoreKS2Name, "80-", "INT64(6)") - - // test the recovery with timetorecover > (timestmap of last binlog event in binlog server) - createRestoreKeyspace(t, restoreTime3, restoreKS3Name) - - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 1 | prd-1 | 1597219030 | - // | 2 | prd-2 | 1597219030 | - // | 3 | prd-3 | 1597219043 | - // | 5 | prd-5 | 1597219045 | - // | 9 | prd-9 | 1597219130 | - // | 10 | prd-10 | 1597219130 | - // | 13 | prd-13 | 1597219141 | - // | 15 | prd-15 | 1597219142 | - // +----+--------+------------+ - testTabletRecovery(t, bs0, "2m", restoreKS3Name, "-80", "INT64(8)") - - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 4 | prd-4 | 1597219044 | - // | 6 | prd-6 | 1597219046 | - // | 7 | prd-7 | 1597219130 | - // | 8 | prd-8 | 1597219130 | - // | 11 | prd-11 | 1597219139 | - // | 12 | prd-12 | 1597219140 | - // | 14 | prd-14 | 1597219142 | - // +----+--------+------------+ - testTabletRecovery(t, bs1, "2m", restoreKS3Name, "80-", "INT64(7)") -} - -func performResharding(t *testing.T) { - err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, vSchema) - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "create", "--source-shards=0", "--target-shards=-80,80-", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - waitTimeout := 30 * time.Second - shard0Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) - shard1Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) - - waitForNoWorkflowLag(t, clusterInstance, "ks", "reshardWorkflow") - - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=rdonly", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=replica", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - // then serve primary from the split shards - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=primary", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - // remove the original tablets in the original shard - removeTablets(t, []*cluster.Vttablet{primary, replica1, replica2}) - - for _, tablet := range []*cluster.Vttablet{replica1, replica2} { - err = clusterInstance.VtctldClientProcess.ExecuteCommand("DeleteTablets", tablet.Alias) - require.NoError(t, err) - } - err = clusterInstance.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", primary.Alias) - require.NoError(t, err) - - // rebuild the serving graph, all mentions of the old shards should be gone - err = clusterInstance.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "ks") - require.NoError(t, err) - - // delete the original shard - err = clusterInstance.VtctldClientProcess.ExecuteCommand("DeleteShards", "ks/0") - require.NoError(t, err) - - // Restart vtgate process - err = clusterInstance.VtgateProcess.TearDown() - require.NoError(t, err) - - err = clusterInstance.VtgateProcess.Setup() - require.NoError(t, err) - - clusterInstance.WaitForTabletsToHealthyInVtgate() -} - -func startBinlogServer(t *testing.T, primaryTablet *cluster.Vttablet) *binLogServer { - bs, err := newBinlogServer(hostname, clusterInstance.GetAndReservePort()) - require.NoError(t, err) - - err = bs.start(mysqlSource{ - hostname: binlogHost, - port: primaryTablet.MysqlctlProcess.MySQLPort, - username: mysqlUserName, - password: mysqlPassword, - }) - require.NoError(t, err) - return bs -} - -func removeTablets(t *testing.T, tablets []*cluster.Vttablet) { - var mysqlProcs []*exec.Cmd - for _, tablet := range tablets { - proc, _ := tablet.MysqlctlProcess.StopProcess() - mysqlProcs = append(mysqlProcs, proc) - } - for _, proc := range mysqlProcs { - err := proc.Wait() - require.NoError(t, err) - } - for _, tablet := range tablets { - tablet.VttabletProcess.TearDown() - } -} - -func initializeCluster(t *testing.T) { - clusterInstance = cluster.NewCluster(cell, hostname) - - // Start topo server - err := clusterInstance.StartTopo() - require.NoError(t, err) - - // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - } - clusterInstance.Keyspaces = append(clusterInstance.Keyspaces, *keyspace) - - shard := &cluster.Shard{ - Name: shardName, - } - shard0 := &cluster.Shard{ - Name: shard0Name, - } - shard1 := &cluster.Shard{ - Name: shard1Name, - } - - // Defining all the tablets - primary = clusterInstance.NewVttabletInstance("replica", 0, "") - replica1 = clusterInstance.NewVttabletInstance("replica", 0, "") - replica2 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard0Primary = clusterInstance.NewVttabletInstance("replica", 0, "") - shard0Replica1 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard0Replica2 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard1Primary = clusterInstance.NewVttabletInstance("replica", 0, "") - shard1Replica1 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard1Replica2 = clusterInstance.NewVttabletInstance("replica", 0, "") - - shard.Vttablets = []*cluster.Vttablet{primary, replica1, replica2} - shard0.Vttablets = []*cluster.Vttablet{shard0Primary, shard0Replica1, shard0Replica2} - shard1.Vttablets = []*cluster.Vttablet{shard1Primary, shard1Replica1, shard1Replica2} - - dbCredentialFile = cluster.WriteDbCredentialToTmp(clusterInstance.TmpDirectory) - extraArgs := []string{"--db-credentials-file", dbCredentialFile} - commonTabletArg = append(commonTabletArg, "--db-credentials-file", dbCredentialFile) - - clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, commonTabletArg...) - clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--restore_from_backup") - - err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard, *shard0, *shard1}) - require.NoError(t, err) - vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory) - out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") - require.NoError(t, err, out) - - initDb, _ := os.ReadFile(path.Join(os.Getenv("VTROOT"), "/config/init_db.sql")) - sql := string(initDb) - // The original init_db.sql does not have any passwords. Here we update the init file with passwords - sql, err = utils.GetInitDBSQL(sql, cluster.GetPasswordUpdateSQL(clusterInstance), "") - require.NoError(t, err, "expected to load init_db file") - initDBFileWithPassword = path.Join(clusterInstance.TmpDirectory, "init_db_with_passwords.sql") - err = os.WriteFile(initDBFileWithPassword, []byte(sql), 0660) - require.NoError(t, err, "expected to load init_db file") - - // Start MySql - var mysqlCtlProcessList []*exec.Cmd - for _, shard := range clusterInstance.Keyspaces[0].Shards { - for _, tablet := range shard.Vttablets { - tablet.MysqlctlProcess.InitDBFile = initDBFileWithPassword - tablet.VttabletProcess.DbPassword = mysqlPassword - tablet.MysqlctlProcess.ExtraArgs = extraArgs - proc, err := tablet.MysqlctlProcess.StartProcess() - require.NoError(t, err) - mysqlCtlProcessList = append(mysqlCtlProcessList, proc) - } - } - - // Wait for mysql processes to start - for _, proc := range mysqlCtlProcessList { - err = proc.Wait() - require.NoError(t, err) - } - - for _, shard := range clusterInstance.Keyspaces[0].Shards { - for _, tablet := range shard.Vttablets { - err = tablet.VttabletProcess.Setup() - require.NoError(t, err) - } - } - - err = clusterInstance.VtctldClientProcess.InitShardPrimary(keyspaceName, shard.Name, cell, primary.TabletUID) - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.InitShardPrimary(keyspaceName, shard0.Name, cell, shard0Primary.TabletUID) - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.InitShardPrimary(keyspaceName, shard1.Name, cell, shard1Primary.TabletUID) - require.NoError(t, err) - - err = clusterInstance.StartVTOrc(keyspaceName) - require.NoError(t, err) - - // Start vtgate - err = clusterInstance.StartVtgate() - require.NoError(t, err) -} - -func insertRow(t *testing.T, id int, productName string, isSlow bool) { - ctx := context.Background() - vtParams := mysql.ConnParams{ - Host: clusterInstance.Hostname, - Port: clusterInstance.VtgateMySQLPort, - } - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - insertSmt := fmt.Sprintf(insertTable, id, productName) - _, err = conn.ExecuteFetch(insertSmt, 1000, true) - require.NoError(t, err) - - if isSlow { - time.Sleep(1 * time.Second) - } -} - -func createRestoreKeyspace(t *testing.T, timeToRecover, restoreKeyspaceName string) { - output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("CreateKeyspace", - "--type=SNAPSHOT", "--base-keyspace="+keyspaceName, - "--snapshot-timestamp", timeToRecover, restoreKeyspaceName) - log.Info(output) - require.NoError(t, err) -} - -func testTabletRecovery(t *testing.T, binlogServer *binLogServer, lookupTimeout, restoreKeyspaceName, shardName, expectedRows string) { - recoveryTablet := clusterInstance.NewVttabletInstance("replica", 0, cell) - launchRecoveryTablet(t, recoveryTablet, binlogServer, lookupTimeout, restoreKeyspaceName, shardName) - - sqlRes, err := recoveryTablet.VttabletProcess.QueryTablet(getCountID, keyspaceName, true) - require.NoError(t, err) - assert.Equal(t, expectedRows, sqlRes.Rows[0][0].String()) - - defer recoveryTablet.MysqlctlProcess.Stop() - defer recoveryTablet.VttabletProcess.TearDown() -} - -func launchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, binlogServer *binLogServer, lookupTimeout, restoreKeyspaceName, shardName string) { - mysqlctlProcess, err := cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory) - require.NoError(t, err) - tablet.MysqlctlProcess = *mysqlctlProcess - extraArgs := []string{"--db-credentials-file", dbCredentialFile} - tablet.MysqlctlProcess.InitDBFile = initDBFileWithPassword - tablet.VttabletProcess.DbPassword = mysqlPassword - tablet.MysqlctlProcess.ExtraArgs = extraArgs - err = tablet.MysqlctlProcess.Start() - require.NoError(t, err) - - tablet.VttabletProcess = cluster.VttabletProcessInstance( - tablet.HTTPPort, - tablet.GrpcPort, - tablet.TabletUID, - clusterInstance.Cell, - shardName, - keyspaceName, - clusterInstance.VtctldProcess.Port, - tablet.Type, - clusterInstance.TopoProcess.Port, - clusterInstance.Hostname, - clusterInstance.TmpDirectory, - clusterInstance.VtTabletExtraArgs, - clusterInstance.DefaultCharset) - tablet.Alias = tablet.VttabletProcess.TabletPath - tablet.VttabletProcess.SupportsBackup = true - tablet.VttabletProcess.Keyspace = restoreKeyspaceName - tablet.VttabletProcess.ExtraArgs = []string{ - "--disable_active_reparents", - "--enable_replication_reporter=false", - "--init_db_name_override", dbName, - "--init_tablet_type", "replica", - "--init_keyspace", restoreKeyspaceName, - "--init_shard", shardName, - "--binlog_host", binlogServer.hostname, - "--binlog_port", fmt.Sprintf("%d", binlogServer.port), - "--binlog_user", binlogServer.username, - "--binlog_password", binlogServer.password, - "--pitr_gtid_lookup_timeout", lookupTimeout, - "--vreplication_retry_delay", "1s", - "--degraded_threshold", "5s", - "--lock_tables_timeout", "5s", - "--watch_replication_stream", - "--serving_state_grace_period", "1s", - "--db-credentials-file", dbCredentialFile, - } - tablet.VttabletProcess.ServingStatus = "" - - err = tablet.VttabletProcess.Setup() - require.NoError(t, err) - - tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second) -} - -// waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag -// value to be 0. -func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ks string, workflow string) { - var lag int64 - timer := time.NewTimer(defaultTimeout) - defer timer.Stop() - for { - output, err := vc.VtctldClientProcess.ExecuteCommandWithOutput("Workflow", "--keyspace", ks, "show", "--workflow", workflow) - require.NoError(t, err) - - var resp vtctldatapb.GetWorkflowsResponse - err = json2.UnmarshalPB([]byte(output), &resp) - require.NoError(t, err) - require.GreaterOrEqual(t, len(resp.Workflows), 1, "responce should have at least one workflow") - lag = resp.Workflows[0].MaxVReplicationTransactionLag - if lag == 0 { - return - } - select { - case <-timer.C: - require.FailNow(t, fmt.Sprintf("workflow %q did not eliminate VReplication lag before the timeout of %s; last seen MaxVReplicationTransactionLag: %d", - strings.Join([]string{ks, workflow}, "."), defaultTimeout, lag)) - default: - time.Sleep(defaultTick) - } - } -} diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 35587124108..54813e11bf3 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -19,7 +19,6 @@ package tabletmanager import ( "context" "fmt" - "io" "time" "github.com/spf13/pflag" @@ -29,22 +28,17 @@ import ( "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/hook" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/backupstats" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) // This file handles the initial backup restore upon startup. @@ -80,31 +74,6 @@ func registerIncrementalRestoreFlags(fs *pflag.FlagSet) { fs.StringVar(&restoreToPos, "restore-to-pos", restoreToPos, "(init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups") } -var ( - // Flags for PITR - old iteration - binlogHost string - binlogPort int - binlogUser string - binlogPwd string - timeoutForGTIDLookup = 60 * time.Second - binlogSslCa string - binlogSslCert string - binlogSslKey string - binlogSslServerName string -) - -func registerPointInTimeRestoreFlags(fs *pflag.FlagSet) { - fs.StringVar(&binlogHost, "binlog_host", binlogHost, "PITR restore parameter: hostname/IP of binlog server.") - fs.IntVar(&binlogPort, "binlog_port", binlogPort, "PITR restore parameter: port of binlog server.") - fs.StringVar(&binlogUser, "binlog_user", binlogUser, "PITR restore parameter: username of binlog server.") - fs.StringVar(&binlogPwd, "binlog_password", binlogPwd, "PITR restore parameter: password of binlog server.") - fs.DurationVar(&timeoutForGTIDLookup, "pitr_gtid_lookup_timeout", timeoutForGTIDLookup, "PITR restore parameter: timeout for fetching gtid from timestamp.") - fs.StringVar(&binlogSslCa, "binlog_ssl_ca", binlogSslCa, "PITR restore parameter: Filename containing TLS CA certificate to verify binlog server TLS certificate against.") - fs.StringVar(&binlogSslCert, "binlog_ssl_cert", binlogSslCert, "PITR restore parameter: Filename containing mTLS client certificate to present to binlog server as authentication.") - fs.StringVar(&binlogSslKey, "binlog_ssl_key", binlogSslKey, "PITR restore parameter: Filename containing mTLS client private key for use in binlog server authentication.") - fs.StringVar(&binlogSslServerName, "binlog_ssl_server_name", binlogSslServerName, "PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in --binlog_host).") -} - func init() { servenv.OnParseFor("vtcombo", registerRestoreFlags) servenv.OnParseFor("vttablet", registerRestoreFlags) @@ -112,9 +81,6 @@ func init() { servenv.OnParseFor("vtcombo", registerIncrementalRestoreFlags) servenv.OnParseFor("vttablet", registerIncrementalRestoreFlags) - servenv.OnParseFor("vtcombo", registerPointInTimeRestoreFlags) - servenv.OnParseFor("vttablet", registerPointInTimeRestoreFlags) - statsRestoreBackupTime = stats.NewString("RestoredBackupTime") statsRestoreBackupPosition = stats.NewString("RestorePosition") } @@ -299,15 +265,6 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L pos = backupManifest.Position params.Logger.Infof("Restore: pos=%v", replication.EncodePosition(pos)) } - // If SnapshotTime is set , then apply the incremental change - if keyspaceInfo.SnapshotTime != nil { - params.Logger.Infof("Restore: Restoring to time %v from binlog", keyspaceInfo.SnapshotTime) - err = tm.restoreToTimeFromBinlog(ctx, pos, keyspaceInfo.SnapshotTime) - if err != nil { - log.Errorf("unable to restore to the specified time %s, error : %v", keyspaceInfo.SnapshotTime.String(), err) - return nil - } - } switch { case err == nil && backupManifest != nil: // Starting from here we won't be able to recover if we get stopped by a cancelled @@ -365,196 +322,6 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L return tm.tmState.ChangeTabletType(bgCtx, originalType, DBActionNone) } -// restoreToTimeFromBinlog restores to the snapshot time of the keyspace -// currently this works with mysql based database only (as it uses mysql specific queries for restoring) -func (tm *TabletManager) restoreToTimeFromBinlog(ctx context.Context, pos replication.Position, restoreTime *vttime.Time) error { - // validate the minimal settings necessary for connecting to binlog server - if binlogHost == "" || binlogPort <= 0 || binlogUser == "" { - log.Warning("invalid binlog server setting, restoring to last available backup.") - return nil - } - - timeoutCtx, cancelFnc := context.WithTimeout(ctx, timeoutForGTIDLookup) - defer cancelFnc() - - afterGTIDPos, beforeGTIDPos, err := tm.getGTIDFromTimestamp(timeoutCtx, pos, restoreTime.Seconds) - if err != nil { - return err - } - - if afterGTIDPos == "" && beforeGTIDPos == "" { - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, fmt.Sprintf("unable to fetch the GTID for the specified time - %s", restoreTime.String())) - } else if afterGTIDPos == "" && beforeGTIDPos != "" { - log.Info("no afterGTIDPos found, which implies we reached the end of all GTID events") - } - - log.Infof("going to restore upto the GTID - %s", afterGTIDPos) - // when we don't have before GTID, we will take it as current backup pos's last GTID - // this is case where someone tries to restore just to the 1st event after backup - if beforeGTIDPos == "" { - beforeGTIDPos = pos.GTIDSet.Last() - } - err = tm.catchupToGTID(timeoutCtx, afterGTIDPos, beforeGTIDPos) - if err != nil { - return vterrors.Wrapf(err, "unable to replicate upto desired GTID : %s", afterGTIDPos) - } - - return nil -} - -// getGTIDFromTimestamp computes 2 GTIDs based on restoreTime -// afterPos is the GTID of the first event at or after restoreTime. -// beforePos is the GTID of the last event before restoreTime. This is the GTID upto which replication will be applied -// afterPos can be used directly in the query `START SLAVE UNTIL SQL_BEFORE_GTIDS = ”` -// beforePos will be used to check if replication was able to catch up from the binlog server -func (tm *TabletManager) getGTIDFromTimestamp(ctx context.Context, pos replication.Position, restoreTime int64) (afterPos string, beforePos string, err error) { - connParams := &mysql.ConnParams{ - Host: binlogHost, - Port: binlogPort, - Uname: binlogUser, - SslCa: binlogSslCa, - SslCert: binlogSslCert, - SslKey: binlogSslKey, - ServerName: binlogSslServerName, - } - if binlogPwd != "" { - connParams.Pass = binlogPwd - } - if binlogSslCa != "" || binlogSslCert != "" { - connParams.EnableSSL() - } - dbCfgs := &dbconfigs.DBConfigs{ - Host: connParams.Host, - Port: connParams.Port, - } - dbCfgs.SetDbParams(*connParams, *connParams, *connParams) - vsClient := vreplication.NewReplicaConnector(tm.Env, connParams) - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "/.*", - }}, - } - - // get current lastPos of binlog server, so that if we hit that in vstream, we'll return from there - binlogConn, err := mysql.Connect(ctx, connParams) - if err != nil { - return "", "", err - } - defer binlogConn.Close() - lastPos, err := binlogConn.PrimaryPosition() - if err != nil { - return "", "", err - } - - gtidsChan := make(chan []string, 1) - - go func() { - err := vsClient.VStream(ctx, replication.EncodePosition(pos), filter, func(events []*binlogdatapb.VEvent) error { - for _, event := range events { - if event.Gtid != "" { - // check if we reached the lastPos then return - eventPos, err := replication.DecodePosition(event.Gtid) - if err != nil { - return err - } - - if event.Timestamp >= restoreTime { - afterPos = event.Gtid - gtidsChan <- []string{event.Gtid, beforePos} - return io.EOF - } - - if eventPos.AtLeast(lastPos) { - gtidsChan <- []string{"", beforePos} - return io.EOF - } - beforePos = event.Gtid - } - } - return nil - }) - if err != nil && err != io.EOF { - log.Warningf("Error using VStream to find timestamp for GTID position: %v error: %v", pos, err) - gtidsChan <- []string{"", ""} - } - }() - defer vsClient.Close() - select { - case val := <-gtidsChan: - return val[0], val[1], nil - case <-ctx.Done(): - log.Warningf("Can't find the GTID from restore time stamp, exiting.") - return "", beforePos, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "unable to find GTID from the snapshot time as context timed out") - } -} - -// catchupToGTID replicates upto specified GTID from binlog server -// -// copies the data from binlog server by pointing to as replica -// waits till all events to GTID replicated -// once done, it will reset the replication -func (tm *TabletManager) catchupToGTID(ctx context.Context, afterGTIDPos string, beforeGTIDPos string) error { - var afterGTID replication.Position - if afterGTIDPos != "" { - var err error - afterGTID, err = replication.DecodePosition(afterGTIDPos) - if err != nil { - return err - } - } - - beforeGTIDPosParsed, err := replication.DecodePosition(beforeGTIDPos) - if err != nil { - return err - } - - if err := tm.MysqlDaemon.CatchupToGTID(ctx, afterGTID); err != nil { - return vterrors.Wrap(err, fmt.Sprintf("failed to restart the replication until %s GTID", afterGTID.GTIDSet.Last())) - } - log.Infof("Waiting for position to reach", beforeGTIDPosParsed.GTIDSet.Last()) - // Could not use `agent.MysqlDaemon.WaitSourcePos` as replication is stopped with `START REPLICA UNTIL SQL_BEFORE_GTIDS` - // this is as per https://dev.mysql.com/doc/refman/8.0/en/start-replica.html - // We need to wait until replication catches upto the specified afterGTIDPos - chGTIDCaughtup := make(chan bool) - go func() { - timeToWait := time.Now().Add(timeoutForGTIDLookup) - for time.Now().Before(timeToWait) { - pos, err := tm.MysqlDaemon.PrimaryPosition(ctx) - if err != nil { - chGTIDCaughtup <- false - } - - if pos.AtLeast(beforeGTIDPosParsed) { - chGTIDCaughtup <- true - } - select { - case <-ctx.Done(): - chGTIDCaughtup <- false - default: - time.Sleep(300 * time.Millisecond) - } - } - }() - select { - case resp := <-chGTIDCaughtup: - if resp { - if err := tm.MysqlDaemon.StopReplication(ctx, nil); err != nil { - return vterrors.Wrap(err, "failed to stop replication") - } - if err := tm.MysqlDaemon.ResetReplicationParameters(ctx); err != nil { - return vterrors.Wrap(err, "failed to reset replication") - } - - return nil - } - return vterrors.Wrap(err, "error while fetching the current GTID position") - case <-ctx.Done(): - log.Warningf("Could not copy up to GTID.") - return vterrors.Wrapf(err, "context timeout while restoring up to specified GTID - %s", beforeGTIDPos) - } -} - // disableReplication stops and resets replication on the mysql server. It moreover sets impossible replication // source params, so that the replica can't possibly reconnect. It would take a `CHANGE [MASTER|REPLICATION SOURCE] TO ...` to // make the mysql server replicate again (available via tm.MysqlDaemon.SetReplicationPosition) diff --git a/test/config.json b/test/config.json index 4bcf616aa3b..da0026f0125 100644 --- a/test/config.json +++ b/test/config.json @@ -349,17 +349,6 @@ "RetryMax": 1, "Tags": [] }, - "pitr": { - "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/recovery/pitr"], - "Command": [], - "Manual": false, - "Shard": "10", - "RetryMax": 1, - "Tags": [ - "site_test" - ] - }, "recovery": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/recovery/unshardedrecovery"],