diff --git a/api/v1beta2/mysqlcluster_types.go b/api/v1beta2/mysqlcluster_types.go index d4315638e..3d3de8a88 100644 --- a/api/v1beta2/mysqlcluster_types.go +++ b/api/v1beta2/mysqlcluster_types.go @@ -566,6 +566,15 @@ type RestoreSpec struct { // Specifies parameters for restore Pod. JobConfig `json:"jobConfig"` + + // Schema is the name of the schema to restore. + // If empty, all schemas are restored. + // This is used for `mysqlbinlog` option `--database`. + // Thus, this option changes behavior depending on binlog_format. + // For more information, please read the following documentation. + // https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html#option_mysqlbinlog_database + // +optional + Schema string `json:"schema,omitempty"` } // MySQLClusterStatus defines the observed state of MySQLCluster diff --git a/api/v1beta2/mysqlcluster_webhook_test.go b/api/v1beta2/mysqlcluster_webhook_test.go index 489aaaff7..bd110c6f2 100644 --- a/api/v1beta2/mysqlcluster_webhook_test.go +++ b/api/v1beta2/mysqlcluster_webhook_test.go @@ -423,6 +423,7 @@ var _ = Describe("MySQLCluster Webhook", func() { EndpointURL: "https://foo.bar.svc:9000", }, }, + Schema: "db1", } err := k8sClient.Create(ctx, r) Expect(err).NotTo(HaveOccurred()) diff --git a/backup/backup_test.go b/backup/backup_test.go index b85b29fb5..b11acaa63 100644 --- a/backup/backup_test.go +++ b/backup/backup_test.go @@ -49,11 +49,11 @@ func (o *getUUIDSetMockOp) PrepareRestore(_ context.Context) error { panic("not implemented") } -func (o *getUUIDSetMockOp) LoadDump(ctx context.Context, dir string) error { +func (o *getUUIDSetMockOp) LoadDump(ctx context.Context, dir string, schema string) error { panic("not implemented") } -func (o *getUUIDSetMockOp) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error { +func (o *getUUIDSetMockOp) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error { panic("not implemented") } diff --git a/backup/integration_test.go b/backup/integration_test.go index d113b0042..0dfdcc40c 100644 --- a/backup/integration_test.go +++ b/backup/integration_test.go @@ -151,7 +151,7 @@ var _ = Describe("Backup/Restore", func() { Expect(bs.WorkDirUsage).To(BeNumerically(">", 0)) Expect(bs.Warnings).To(BeEmpty()) - rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bs.Time.Time) + rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bs.Time.Time, "") Expect(err).NotTo(HaveOccurred()) ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) @@ -240,7 +240,7 @@ var _ = Describe("Backup/Restore", func() { Expect(bs.WorkDirUsage).To(BeNumerically(">", 0)) Expect(bs.Warnings).To(BeEmpty()) - rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, restorePoint) + rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, restorePoint, "") Expect(err).NotTo(HaveOccurred()) err = rm.Restore(ctx) @@ -292,7 +292,7 @@ var _ = Describe("Backup/Restore", func() { Expect(err).NotTo(HaveOccurred()) Expect(bc.contents).To(HaveLen(3)) - rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bt) + rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bt, "") Expect(err).NotTo(HaveOccurred()) err = rm.Restore(ctx) diff --git a/backup/mock_test.go b/backup/mock_test.go index c07b5c9c2..585a6402a 100644 --- a/backup/mock_test.go +++ b/backup/mock_test.go @@ -97,7 +97,7 @@ func (o *mockOperator) PrepareRestore(_ context.Context) error { return nil } -func (o *mockOperator) LoadDump(ctx context.Context, dir string) error { +func (o *mockOperator) LoadDump(ctx context.Context, dir string, schema string) error { if !o.prepared { return errors.New("not prepared") } @@ -105,7 +105,7 @@ func (o *mockOperator) LoadDump(ctx context.Context, dir string) error { return err } -func (o *mockOperator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error { +func (o *mockOperator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error { if !o.prepared { return errors.New("not prepared") } diff --git a/backup/restore.go b/backup/restore.go index 441ff299e..ca49f168d 100644 --- a/backup/restore.go +++ b/backup/restore.go @@ -42,11 +42,12 @@ type RestoreManager struct { keyPrefix string restorePoint time.Time workDir string + schema string } var ErrBadConnection = errors.New("the connection hasn't reflected the latest user's privileges") -func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time) (*RestoreManager, error) { +func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time, schema string) (*RestoreManager, error) { log := zap.New(zap.WriteTo(os.Stderr), zap.StacktraceLevel(zapcore.DPanicLevel)) scheme := runtime.NewScheme() if err := clientgoscheme.AddToScheme(scheme); err != nil { @@ -74,6 +75,7 @@ func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, keyPrefix: prefix, restorePoint: restorePoint, workDir: dir, + schema: schema, }, nil } @@ -254,7 +256,7 @@ func (rm *RestoreManager) loadDump(ctx context.Context, op bkop.Operator, key st return fmt.Errorf("failed to untar dump file: %w", err) } - return op.LoadDump(ctx, dumpDir) + return op.LoadDump(ctx, dumpDir, rm.schema) } func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key string) error { @@ -317,5 +319,5 @@ func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key os.RemoveAll(tmpDir) }() - return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint) + return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint, rm.schema) } diff --git a/charts/moco/templates/generated/crds/moco_crds.yaml b/charts/moco/templates/generated/crds/moco_crds.yaml index 1ab4e5a4b..2a2ea8766 100644 --- a/charts/moco/templates/generated/crds/moco_crds.yaml +++ b/charts/moco/templates/generated/crds/moco_crds.yaml @@ -7880,6 +7880,9 @@ spec: description: RestorePoint is the target date and time to restor format: date-time type: string + schema: + description: Schema is the name of the schema to restore. + type: string sourceName: description: SourceName is the name of the source `MySQLCluster minLength: 1 diff --git a/cmd/moco-backup/cmd/restore.go b/cmd/moco-backup/cmd/restore.go index d70558704..886168d23 100644 --- a/cmd/moco-backup/cmd/restore.go +++ b/cmd/moco-backup/cmd/restore.go @@ -12,7 +12,7 @@ import ( ) var restoreCmd = &cobra.Command{ - Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss", + Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss SCHEMA", Short: "restore MySQL data from a backup", Long: `Restore MySQL data from a backup. @@ -21,8 +21,10 @@ SOURCE_NAMESPACE: The source MySQLCluster's namespace. SOURCE_NAME: The source MySQLCluster's name. NAMESPACE: The target MySQLCluster's namespace. NAME: The target MySQLCluster's name. -YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423`, - Args: cobra.ExactArgs(6), +YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423 +SCHEMA: The target schema to restore. If SCHEMA is empty, all schemas are restored.`, + + Args: cobra.ExactArgs(7), RunE: func(cmd *cobra.Command, args []string) error { maxRetry := 3 for i := 0; i < maxRetry; i++ { @@ -54,6 +56,7 @@ func runRestore(cmd *cobra.Command, args []string) (e error) { srcName := args[2] namespace := args[3] name := args[4] + schema := args[6] restorePoint, err := time.Parse(constants.BackupTimeFormat, args[5]) if err != nil { @@ -75,7 +78,8 @@ func runRestore(cmd *cobra.Command, args []string) (e error) { namespace, name, mysqlPassword, commonArgs.threads, - restorePoint) + restorePoint, + schema) if err != nil { return fmt.Errorf("failed to create a restore manager: %w", err) } diff --git a/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml b/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml index 1f91f55ac..a050f225f 100644 --- a/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml +++ b/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml @@ -6268,6 +6268,9 @@ spec: description: RestorePoint is the target date and time to restor format: date-time type: string + schema: + description: Schema is the name of the schema to restore. + type: string sourceName: description: SourceName is the name of the source `MySQLCluster minLength: 1 diff --git a/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml b/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml index 3351a2738..726582aa0 100644 --- a/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml +++ b/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml @@ -6268,6 +6268,9 @@ spec: description: RestorePoint is the target date and time to restor format: date-time type: string + schema: + description: Schema is the name of the schema to restore. + type: string sourceName: description: SourceName is the name of the source `MySQLCluster minLength: 1 diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 07a5cbba5..5f29d5510 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -1448,6 +1448,7 @@ func (r *MySQLClusterReconciler) reconcileV1RestoreJob(ctx context.Context, req args = append(args, cluster.Spec.Restore.SourceNamespace, cluster.Spec.Restore.SourceName) args = append(args, cluster.Namespace, cluster.Name) args = append(args, cluster.Spec.Restore.RestorePoint.UTC().Format(constants.BackupTimeFormat)) + args = append(args, cluster.Spec.Restore.Schema) resources := corev1ac.ResourceRequirements() if !noJobResource { diff --git a/controllers/mysqlcluster_controller_test.go b/controllers/mysqlcluster_controller_test.go index 1efffdfaf..eeb56590c 100644 --- a/controllers/mysqlcluster_controller_test.go +++ b/controllers/mysqlcluster_controller_test.go @@ -1560,6 +1560,7 @@ var _ = Describe("MySQLCluster reconciler", func() { "test", "test", now.UTC().Format(constants.BackupTimeFormat), + "", })) Expect(c.EnvFrom).To(HaveLen(1)) Expect(c.Env).To(HaveLen(2)) diff --git a/docs/crd_mysqlcluster_v1beta2.md b/docs/crd_mysqlcluster_v1beta2.md index 0647fdaa8..a42743e30 100644 --- a/docs/crd_mysqlcluster_v1beta2.md +++ b/docs/crd_mysqlcluster_v1beta2.md @@ -173,6 +173,7 @@ RestoreSpec represents a set of parameters for Point-in-Time Recovery. | sourceNamespace | SourceNamespace is the namespace of the source `MySQLCluster`. | string | true | | restorePoint | RestorePoint is the target date and time to restore data. The format is RFC3339. e.g. \"2006-01-02T15:04:05Z\" | [metav1.Time](https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Time) | true | | jobConfig | Specifies parameters for restore Pod. | [JobConfig](#jobconfig) | true | +| schema | Schema is the name of the schema to restore. If empty, all schemas are restored. This is used for `mysqlbinlog` option `--database`. Thus, this option changes behavior depending on binlog_format. For more information, please read the following documentation. https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html#option_mysqlbinlog_database | string | false | [Back to Custom Resources](#custom-resources) diff --git a/e2e/backup_test.go b/e2e/backup_test.go index aa0772e50..5ce649237 100644 --- a/e2e/backup_test.go +++ b/e2e/backup_test.go @@ -23,8 +23,11 @@ var makeBucketYAML string //go:embed testdata/backup.yaml var backupYAML string -//go:embed testdata/restore.yaml -var restoreYAML string +//go:embed testdata/restore1.yaml +var restore1YAML string + +//go:embed testdata/restore2.yaml +var restore2YAML string var _ = Context("backup", func() { if doUpgrade { @@ -58,11 +61,17 @@ var _ = Context("backup", func() { }).Should(Succeed()) kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", - "-e", "CREATE DATABASE test") + "-e", "CREATE DATABASE test1") + kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", + "-D", "test1", "-e", "CREATE TABLE t (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(32) NOT NULL, PRIMARY KEY (id), KEY key1 (data), KEY key2 (data, id)) ENGINE=InnoDB") + kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", + "-D", "test1", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('aaa')") + kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", + "-e", "CREATE DATABASE test2") kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", - "-D", "test", "-e", "CREATE TABLE t (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(32) NOT NULL, PRIMARY KEY (id), KEY key1 (data), KEY key2 (data, id)) ENGINE=InnoDB") + "-D", "test2", "-e", "CREATE TABLE t (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(32) NOT NULL, PRIMARY KEY (id), KEY key1 (data), KEY key2 (data, id)) ENGINE=InnoDB") kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", - "-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('aaa')") + "-D", "test2", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('aaa')") }) It("should take a full dump", func() { @@ -81,14 +90,20 @@ var _ = Context("backup", func() { It("should take an incremental backup", func() { kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", - "-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('bbb')") + "-D", "test1", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('bbb')") + kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", + "-D", "test2", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('bbb')") time.Sleep(1100 * time.Millisecond) restorePoint = time.Now().UTC() time.Sleep(1100 * time.Millisecond) kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-admin", "source", "--", - "-D", "test", "--init_command=SET autocommit=1", "-e", "FLUSH LOCAL BINARY LOGS") + "-D", "test1", "--init_command=SET autocommit=1", "-e", "FLUSH LOCAL BINARY LOGS") + kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", + "-D", "test1", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('ccc')") + kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-admin", "source", "--", + "-D", "test2", "--init_command=SET autocommit=1", "-e", "FLUSH LOCAL BINARY LOGS") kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--", - "-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('ccc')") + "-D", "test2", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('ccc')") time.Sleep(100 * time.Millisecond) kubectlSafe(nil, "-n", "backup", "create", "job", "--from=cronjob/moco-backup-source", "backup-2") @@ -111,7 +126,41 @@ var _ = Context("backup", func() { It("should destroy the source then restore the backup data", func() { kubectlSafe(nil, "-n", "backup", "delete", "mysqlclusters", "source") - tmpl, err := template.New("").Parse(restoreYAML) + tmpl, err := template.New("").Parse(restore1YAML) + Expect(err).NotTo(HaveOccurred()) + buf := new(bytes.Buffer) + err = tmpl.Execute(buf, struct { + MySQLVersion string + RestorePoint string + }{ + mysqlVersion, + restorePoint.Format(time.RFC3339), + }) + Expect(err).NotTo(HaveOccurred()) + + kubectlSafe(buf.Bytes(), "apply", "-f", "-") + Eventually(func(g Gomega) { + cluster, err := getCluster("backup", "target1") + g.Expect(err).NotTo(HaveOccurred()) + condHealthy, err := getClusterCondition(cluster, mocov1beta2.ConditionHealthy) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue), "target1 is not healthy") + }).Should(Succeed()) + + out := kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target1", "--", + "-N", "-D", "test1", "-e", "SELECT COUNT(*) FROM t") + count, err := strconv.Atoi(strings.TrimSpace(string(out))) + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(2)) + + out = kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target1", "--", + "-N", "-e", "SHOW DATABASES LIKE 'test%'") + databases := strings.Fields(string(out)) + Expect(databases).Should(ConsistOf("test1", "test2")) + }) + + It("should restore only test2 schema", func() { + tmpl, err := template.New("").Parse(restore2YAML) Expect(err).NotTo(HaveOccurred()) buf := new(bytes.Buffer) err = tmpl.Execute(buf, struct { @@ -125,18 +174,23 @@ var _ = Context("backup", func() { kubectlSafe(buf.Bytes(), "apply", "-f", "-") Eventually(func(g Gomega) { - cluster, err := getCluster("backup", "target") + cluster, err := getCluster("backup", "target2") g.Expect(err).NotTo(HaveOccurred()) condHealthy, err := getClusterCondition(cluster, mocov1beta2.ConditionHealthy) g.Expect(err).NotTo(HaveOccurred()) - g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue), "target is not healthy") + g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue), "target2 is not healthy") }).Should(Succeed()) - out := kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target", "--", - "-N", "-D", "test", "-e", "SELECT COUNT(*) FROM t") + out := kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target2", "--", + "-N", "-D", "test2", "-e", "SELECT COUNT(*) FROM t") count, err := strconv.Atoi(strings.TrimSpace(string(out))) Expect(err).NotTo(HaveOccurred()) Expect(count).To(Equal(2)) + + out = kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target2", "--", + "-N", "-e", "SHOW DATABASES LIKE 'test%'") + databases := strings.Fields(string(out)) + Expect(databases).Should(ConsistOf("test2")) }) It("should delete clusters", func() { diff --git a/e2e/testdata/restore.yaml b/e2e/testdata/restore1.yaml similarity index 98% rename from e2e/testdata/restore.yaml rename to e2e/testdata/restore1.yaml index 79835305b..d77c0182a 100644 --- a/e2e/testdata/restore.yaml +++ b/e2e/testdata/restore1.yaml @@ -2,7 +2,7 @@ apiVersion: moco.cybozu.com/v1beta2 kind: MySQLCluster metadata: namespace: backup - name: target + name: target1 spec: mysqlConfigMapName: mycnf replicas: 1 diff --git a/e2e/testdata/restore2.yaml b/e2e/testdata/restore2.yaml new file mode 100644 index 000000000..c46423958 --- /dev/null +++ b/e2e/testdata/restore2.yaml @@ -0,0 +1,41 @@ +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: backup + name: target2 +spec: + mysqlConfigMapName: mycnf + replicas: 1 + restore: + sourceName: source + sourceNamespace: backup + restorePoint: "{{ .RestorePoint }}" + schema: "test2" + jobConfig: + serviceAccountName: backup-owner + env: + - name: AWS_ACCESS_KEY_ID + value: minioadmin + - name: AWS_SECRET_ACCESS_KEY + value: minioadmin + - name: AWS_REGION + value: us-east-1 + bucketConfig: + bucketName: moco + endpointURL: http://minio.default.svc:9000 + usePathStyle: true + workVolume: + emptyDir: {} + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ .MySQLVersion }} + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 1Gi diff --git a/pkg/bkop/operator.go b/pkg/bkop/operator.go index bc7e8d99d..6e71fc69c 100644 --- a/pkg/bkop/operator.go +++ b/pkg/bkop/operator.go @@ -37,10 +37,10 @@ type Operator interface { PrepareRestore(context.Context) error // LoadDump loads data dumped by `DumpFull`. - LoadDump(ctx context.Context, dir string) error + LoadDump(ctx context.Context, dir string, schema string) error // LoadBinLog applies binary logs up to `restorePoint`. - LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error + LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error // FinishRestore sets global variables of the database instance after restoration. FinishRestore(context.Context) error diff --git a/pkg/bkop/operator_test.go b/pkg/bkop/operator_test.go index d173355dc..535686352 100644 --- a/pkg/bkop/operator_test.go +++ b/pkg/bkop/operator_test.go @@ -168,7 +168,7 @@ var _ = Describe("Operator", func() { err = opRe.PrepareRestore(ctx) Expect(err).NotTo(HaveOccurred()) - err = opRe.LoadDump(ctx, dumpDir) + err = opRe.LoadDump(ctx, dumpDir, "") Expect(err).NotTo(HaveOccurred()) var restoredGTID string @@ -180,7 +180,7 @@ var _ = Describe("Operator", func() { err = os.MkdirAll(tmpDir, 0755) Expect(err).NotTo(HaveOccurred()) - err = opRe.LoadBinlog(ctx, binlogDir, tmpDir, restorePoint) + err = opRe.LoadBinlog(ctx, binlogDir, tmpDir, restorePoint, "") Expect(err).NotTo(HaveOccurred()) Expect(restoredGTID).To(Equal(dumpGTID)) var maxID int diff --git a/pkg/bkop/restore.go b/pkg/bkop/restore.go index 6d011616a..f9907f298 100644 --- a/pkg/bkop/restore.go +++ b/pkg/bkop/restore.go @@ -21,7 +21,7 @@ func (o operator) PrepareRestore(ctx context.Context) error { return nil } -func (o operator) LoadDump(ctx context.Context, dir string) error { +func (o operator) LoadDump(ctx context.Context, dir string, schema string) error { args := []string{ fmt.Sprintf("mysql://%s@%s", o.user, net.JoinHostPort(o.host, fmt.Sprint(o.port))), "-p" + o.password, @@ -38,6 +38,9 @@ func (o operator) LoadDump(ctx context.Context, dir string) error { "--deferTableIndexes=all", "--updateGtidSet=replace", } + if schema != "" { + args = append(args, "--includeSchemas="+schema) + } cmd := exec.CommandContext(ctx, "mysqlsh", args...) cmd.Stdout = os.Stdout @@ -45,7 +48,7 @@ func (o operator) LoadDump(ctx context.Context, dir string) error { return cmd.Run() } -func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error { +func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error { dirents, err := os.ReadDir(binlogDir) if err != nil { return err @@ -88,6 +91,9 @@ func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, rest //mysqlbinlog --stop-datetime="2021-05-13 10:45:00" log/binlog.000001 log/binlog.000002 // | mysql --binary-mode -h moco-single-primary.bar.svc -u moco-admin -p binlogArgs := append([]string{"--stop-datetime=" + restorePoint.Format("2006-01-02 15:04:05")}, binlogFiles...) + if schema != "" { + binlogArgs = append(binlogArgs, "--database="+schema) + } binlogCmd := exec.CommandContext(ctx, "mysqlbinlog", binlogArgs...) binlogCmd.Stdout = pw binlogCmd.Stderr = os.Stderr