diff --git a/pkg/api/types.go b/pkg/api/types.go index cb30a325..1f600307 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -129,6 +129,7 @@ type Engine struct { Port int32 `json:"port"` TargetIP string `json:"target_ip"` TargetPort int32 `json:"target_port"` + StandbyTargetPort int32 `json:"standby_target_port"` ReplicaAddressMap map[string]string `json:"replica_address_map"` ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"` Head *Lvol `json:"head"` @@ -149,6 +150,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine { Port: e.Port, TargetIP: e.TargetIp, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, ReplicaAddressMap: e.ReplicaAddressMap, ReplicaModeMap: map[string]types.Mode{}, Head: ProtoLvolToLvol(e.Head), diff --git a/pkg/client/client.go b/pkg/client/client.go index b67f20ea..47a554c5 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -487,7 +487,6 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui ReplicaAddressMap: replicaAddressMap, Frontend: frontend, PortCount: portCount, - UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, SalvageRequested: salvageRequested, diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 6c79c7ff..3b281508 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -34,18 +34,19 @@ import ( type Engine struct { sync.RWMutex - Name string - VolumeName string - SpecSize uint64 - ActualSize uint64 - IP string - Port int32 - TargetIP string - TargetPort int32 - Frontend string - Endpoint string - Nqn string - Nguid string + Name string + VolumeName string + SpecSize uint64 + ActualSize uint64 + IP string + Port int32 // Port that initiator is connecting to + TargetIP string + TargetPort int32 // Port of the target that is used for letting initiator connect to + StandbyTargetPort int32 + Frontend string + Endpoint string + Nqn string + Nguid string ReplicaStatusMap map[string]*EngineReplicaStatus @@ -104,6 +105,42 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } +func isNewEngine(e *Engine) bool { + return e.IP == "" && e.TargetIP == "" && e.StandbyTargetPort == 0 +} + +func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { + initiatorCreationRequired, targetCreationRequired := false, false + var err error + + if podIP == initiatorIP && podIP == targetIP { + if e.Port == 0 && e.TargetPort == 0 { + e.log.Info("Creating both initiator and target instances") + initiatorCreationRequired = true + targetCreationRequired = true + } else if e.Port != 0 && e.TargetPort == 0 { + e.log.Info("Creating a target instance") + if e.StandbyTargetPort != 0 { + e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort) + } else { + targetCreationRequired = true + } + } else { + err = fmt.Errorf("invalid initiator and target address for engine %s creation", e.Name) + } + } else if podIP == initiatorIP { + e.log.Info("Creating an initiator instance") + initiatorCreationRequired = true + } else if podIP == targetIP { + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + err = fmt.Errorf("invalid initiator and target address for engine %s creation", e.Name) + } + + return initiatorCreationRequired, targetCreationRequired, err +} + func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, @@ -119,7 +156,6 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.Lock() defer func() { e.Unlock() - if requireUpdate { e.UpdateCh <- nil } @@ -134,6 +170,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str if err != nil { return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } + targetIP, _, err := splitHostPort(targetAddress) if err != nil { return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) @@ -168,29 +205,30 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str replicaBdevList := []string{} - initiatorCreationRequired := true - if !upgradeRequired { - if e.IP == "" { - if initiatorIP != targetIP { - // For creating target on another node - initiatorCreationRequired = false - e.log.Info("Creating an target engine") - e.TargetIP = podIP - } else { - // For newly creating engine - e.log.Info("Creating an new engine") - e.IP = podIP - e.TargetIP = podIP - } + initiatorCreationRequired, targetCreationRequired, err := e.checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP) + if err != nil { + return nil, err + } + if !initiatorCreationRequired && !targetCreationRequired { + return e.getWithoutLock(), nil + } - e.log = e.log.WithField("ip", e.IP) - } else { - if initiatorIP != targetIP { - return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) - } + if isNewEngine(e) { + if initiatorCreationRequired { + e.IP = initiatorIP + } + e.TargetIP = targetIP + } + + e.log = e.log.WithFields(logrus.Fields{ + "initiatorIP": e.IP, + "targetIP": e.TargetIP, + }) - // For creating target on attached node - initiatorCreationRequired = false + if targetCreationRequired { + _, err := spdkClient.BdevRaidGet(e.Name, 0) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to get raid bdev %v during engine creation", e.Name) } if salvageRequested { @@ -222,18 +260,15 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) + // TODO: improve the log message + e.log.Infof("Connecting all available replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } } else { - // For reconstructing engine after switching over target to another node - initiatorCreationRequired = false + e.log.Info("Skipping target creation during engine creation") - e.IP = targetIP - - // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKServiceAddress := net.JoinHostPort(e.TargetIP, strconv.Itoa(types.SPDKServicePort)) targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err @@ -244,21 +279,17 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - var engineWithTarget *api.Engine - if initiatorIP != targetIP { - engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) - } - } else { - engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + e.log.Info("Fetching replica list from target engine") + targetEngine, err := targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) } for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } - if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + if _, ok := targetEngine.ReplicaAddressMap[replicaName]; !ok { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { @@ -267,12 +298,21 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaStatusMap[replicaName].BdevName = replicaName } } - e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) - e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) + + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log.Infof("Connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } - e.log.Info("Launching frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { + log := e.log.WithFields(logrus.Fields{ + "initiatorCreationRequired": initiatorCreationRequired, + "targetCreationRequired": targetCreationRequired, + "upgradeRequired": upgradeRequired, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }) + + log.Info("Handling frontend during engine creation") + if err := e.handleFrontend(spdkClient, superiorPortAllocator, portCount, targetAddress, initiatorCreationRequired, targetCreationRequired); err != nil { return nil, err } @@ -357,7 +397,8 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, + initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -367,9 +408,9 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - initiatorIP, _, err := splitHostPort(initiatorAddress) - if err != nil { - return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + standbyTargetCreationRequired := false + if e.Port != 0 && e.TargetPort == 0 { + standbyTargetCreationRequired = true } targetIP, targetPort, err := splitHostPort(targetAddress) @@ -378,34 +419,87 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, } e.Nqn = helpertypes.GetNQN(e.Name) + e.Nguid = commonutils.RandomID(nvmeNguidLength) + + dmDeviceBusy := false + port := int32(0) + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) + } - var port int32 - if !upgradeRequired { - e.Nguid = commonutils.RandomID(nvmeNguidLength) + defer func() { + if err == nil { + if !standbyTargetCreationRequired { + e.initiator = initiator + e.dmDeviceBusy = dmDeviceBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + } - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + e.log.Infof("Finished handling frontend for engine: %+v", e) } + }() - port, _, err = superiorPortAllocator.AllocateRange(portCount) - if err != nil { - return err + if initiatorCreationRequired && !targetCreationRequired { + initiator.TransportAddress = targetIP + initiator.TransportServiceID = strconv.Itoa(int(targetPort)) + + e.log.Infof("Target instance already exists on %v, no need to create target instance", targetAddress) + e.Port = targetPort + + // TODO: + // "nvme list -o json" might be empty devices for a while instance manager pod is just started. + // The root cause is not clear, so we need to retry to load NVMe device info. + for r := 0; r < maxNumRetries; r++ { + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil && strings.Contains(err.Error(), "failed to get devices") { + time.Sleep(retryInterval) + continue + } + if err == nil { + e.log.Infof("Loaded NVMe device info for engine") + break + } + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) } - e.log.Infof("Allocated port %v", port) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { - return err + err = initiator.LoadEndpoint(false) + if err != nil { + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) } - if initiatorCreationRequired { - e.Port = port - e.TargetPort = port + return nil + } + + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return errors.Wrapf(err, "failed to allocate port for engine %v", e.Name) + } + e.log.Infof("Allocated port %v for engine", port) + + if initiatorCreationRequired { + e.Port = port + } + if targetCreationRequired { + if standbyTargetCreationRequired { + e.StandbyTargetPort = port } else { e.TargetPort = port } - } else { - e.Port = targetPort + } + + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err } if e.Frontend == types.FrontendSPDKTCPNvmf { @@ -413,50 +507,17 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - if initiatorIP != targetIP && !upgradeRequired { - e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) + if !initiatorCreationRequired && targetCreationRequired { + e.log.Infof("Only creating target instance for engine, no need to start initiator") return nil } - initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) - if err != nil { - return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) - } + e.log.Info("Starting initiator for engine") - dmDeviceBusy := false - if initiatorCreationRequired { - e.log.Info("Starting initiator for engine") - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - e.log.Info("Loading NVMe device info for engine") - err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) - if err != nil { - if nvme.IsValidNvmeDeviceNotFound(err) { - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) - } - } - err = initiator.LoadEndpoint(false) - if err != nil { - return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) - } - //dmDeviceBusy = true + dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) + if err != nil { + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) } - e.initiator = initiator - e.dmDeviceBusy = dmDeviceBusy - e.Endpoint = initiator.GetEndpoint() - - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": port, - }) return nil } @@ -564,6 +625,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { Port: e.Port, TargetIp: e.TargetIP, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, Snapshots: map[string]*spdkrpc.Lvol{}, Frontend: e.Frontend, Endpoint: e.Endpoint, @@ -630,16 +692,12 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { } }() - podIP, err := commonnet.GetIPForPod() - if err != nil { - return err - } - if e.IP != podIP { - // Skip the validation if the engine is being upgraded - if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { - return nil - } - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) + // podIP, err := commonnet.GetIPForPod() + // if err != nil { + // return err + // } + if e.IP != e.TargetIP { + return nil } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -2122,6 +2180,11 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s e.UpdateCh <- nil }() + podIP, err := commonnet.GetIPForPod() + if err != nil { + return err + } + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) @@ -2164,6 +2227,11 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s e.IP = targetIP e.Port = targetPort + if targetIP == podIP { + e.TargetPort = targetPort + e.StandbyTargetPort = 0 + } + e.log.Info("Reloading device mapper after target switchover") if err := e.reloadDevice(); err != nil { return err @@ -2333,11 +2401,3 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { return oldTargetAddress != newTargetAddress } - -func engineOnlyContainsInitiator(e *Engine) bool { - return e.Port != 0 && e.TargetPort == 0 -} - -func engineOnlyContainsTarget(e *Engine) bool { - return e.Port == 0 && e.TargetPort != 0 -} diff --git a/pkg/spdk/engine_test.go b/pkg/spdk/engine_test.go new file mode 100644 index 00000000..c9e303aa --- /dev/null +++ b/pkg/spdk/engine_test.go @@ -0,0 +1,115 @@ +package spdk + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + . "gopkg.in/check.v1" +) + +func (s *TestSuite) TestCheckInitiatorAndTargetCreationRequirements(c *C) { + testCases := []struct { + name string + podIP string + initiatorIP string + targetIP string + port int32 + targetPort int32 + standbyTargetPort int32 + expectedInitiatorCreationRequired bool + expectedTargetCreationRequired bool + expectedError error + }{ + { + name: "Create both initiator and target instances", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local target instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 8080, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local initiator instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + { + name: "Create remote target instance only", + podIP: "192.168.1.2", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Invalid initiator and target address", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.2", + targetIP: "192.168.1.3", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: fmt.Errorf("invalid initiator and target address for engine %s creation", "test-engine"), + }, + { + name: "Standby target instance is already created", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 100, + targetPort: 0, + standbyTargetPort: 105, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing TestCheckInitiatorAndTargetCreationRequirements.%v", testName) + + engine := &Engine{ + Port: testCase.port, + TargetPort: testCase.targetPort, + StandbyTargetPort: testCase.standbyTargetPort, + Name: "test-engine", + log: logrus.New(), + } + + initiatorCreationRequired, targetCreationRequired, err := engine.checkInitiatorAndTargetCreationRequirements(testCase.podIP, testCase.initiatorIP, testCase.targetIP) + + c.Assert(initiatorCreationRequired, Equals, testCase.expectedInitiatorCreationRequired) + c.Assert(targetCreationRequired, Equals, testCase.expectedTargetCreationRequired) + c.Assert(err, DeepEquals, testCase.expectedError) + } +} diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 1123fc6f..7997692e 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -8,7 +8,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "testing" "time" @@ -27,10 +26,8 @@ import ( server "github.com/longhorn/longhorn-spdk-engine/pkg/spdk" - "github.com/longhorn/longhorn-spdk-engine/pkg/api" "github.com/longhorn/longhorn-spdk-engine/pkg/client" "github.com/longhorn/longhorn-spdk-engine/pkg/types" - "github.com/longhorn/longhorn-spdk-engine/pkg/util" . "gopkg.in/check.v1" ) @@ -49,9 +46,6 @@ var ( defaultTestReplicaPortCount = int32(5) defaultTestExecuteTimeout = 10 * time.Second - - defaultTestRebuildingWaitInterval = 3 * time.Second - defaultTestRebuildingWaitCount = 60 ) func Test(t *testing.T) { TestingT(t) } @@ -179,1129 +173,6 @@ func CleanupDiskFile(c *C, loopDevicePath string) { c.Assert(err, IsNil) } -func (s *TestSuite) TestSPDKMultipleThread(c *C) { - fmt.Println("Testing SPDK basic operations with multiple threads") - - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - concurrentCount := 10 - dataCountInMB := 100 - wg := sync.WaitGroup{} - wg.Add(concurrentCount) - for i := 0; i < concurrentCount; i++ { - volumeName := fmt.Sprintf("test-vol-%d", i) - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - replicaName3 := fmt.Sprintf("%s-replica-3", volumeName) - - go func() { - defer func() { - // Do cleanup - // TODO: Check why there is a race here - // err = spdkCli.EngineDelete(engineName) - // c.Assert(err, IsNil) - // err = spdkCli.ReplicaDelete(replicaName1, true) - // c.Assert(err, IsNil) - // err = spdkCli.ReplicaDelete(replicaName2, true) - // c.Assert(err, IsNil) - // err = spdkCli.ReplicaDelete(replicaName3, true) - // c.Assert(err, IsNil) - - wg.Done() - }() - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica1.LvsName, Equals, defaultTestDiskName) - c.Assert(replica1.LvsUUID, Equals, disk.Uuid) - c.Assert(replica1.State, Equals, types.InstanceStateRunning) - c.Assert(replica1.PortStart, Not(Equals), int32(0)) - c.Assert(replica1.Head, NotNil) - c.Assert(replica1.Head.CreationTime, Not(Equals), "") - c.Assert(replica1.Head.Parent, Equals, "") - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica2.LvsName, Equals, defaultTestDiskName) - c.Assert(replica2.LvsUUID, Equals, disk.Uuid) - c.Assert(replica2.State, Equals, types.InstanceStateRunning) - c.Assert(replica2.PortStart, Not(Equals), int32(0)) - c.Assert(replica2.Head, NotNil) - c.Assert(replica2.Head.CreationTime, Not(Equals), "") - c.Assert(replica2.Head.Parent, Equals, "") - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - replicaModeMap := map[string]types.Mode{ - replica1.Name: types.ModeRW, - replica2.Name: types.ModeRW, - } - endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, replicaModeMap) - c.Assert(engine.Endpoint, Equals, endpoint) - // initiator and target are on the same node - c.Assert(engine.IP, Not(Equals), "") - c.Assert(engine.TargetIP, Not(Equals), "") - c.Assert(engine.IP, Equals, engine.TargetIP) - c.Assert(engine.Port, Not(Equals), int32(0)) - c.Assert(engine.TargetPort, Not(Equals), int32(0)) - c.Assert(engine.Port, Equals, engine.TargetPort) - - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), "seek=0", "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore1, err := util.GetFileChunkChecksum(endpoint, 0, 100*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore1, Not(Equals), "") - - snapshotName1 := "snap1" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName1) - c.Assert(err, IsNil) - - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), "seek=200", "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore2, err := util.GetFileChunkChecksum(endpoint, 200*helpertypes.MiB, 100*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore2, Not(Equals), "") - - snapshotName2 := "snap2" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName2) - c.Assert(err, IsNil) - - // Check both replica snapshot map after the snapshot operations - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName1: {snapshotName2}, - snapshotName2: {types.VolumeHead}, - }, - map[string]api.SnapshotOptions{ - snapshotName1: {UserCreated: true}, - snapshotName2: {UserCreated: true}, - }) - - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName1) - c.Assert(err, IsNil) - - // Detach and re-attach the volume - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName1, false) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName2, false) - c.Assert(err, IsNil) - - replica1, err = spdkCli.ReplicaGet(replicaName1) - c.Assert(err, IsNil) - c.Assert(replica1.LvsName, Equals, defaultTestDiskName) - c.Assert(replica1.LvsUUID, Equals, disk.Uuid) - c.Assert(replica1.State, Equals, types.InstanceStateStopped) - c.Assert(replica1.PortStart, Equals, int32(0)) - c.Assert(replica1.PortEnd, Equals, int32(0)) - - replica2, err = spdkCli.ReplicaGet(replicaName1) - c.Assert(err, IsNil) - c.Assert(replica2.LvsName, Equals, defaultTestDiskName) - c.Assert(replica2.LvsUUID, Equals, disk.Uuid) - c.Assert(replica2.State, Equals, types.InstanceStateStopped) - c.Assert(replica2.PortStart, Equals, int32(0)) - c.Assert(replica2.PortEnd, Equals, int32(0)) - - replica1, err = spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica1.State, Equals, types.InstanceStateRunning) - replica2, err = spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica2.State, Equals, types.InstanceStateRunning) - - replicaAddressMap = map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.Port, Not(Equals), int32(0)) - c.Assert(engine.Endpoint, Equals, endpoint) - - // Check both replica snapshot map after the snapshot deletion and volume re-attachment - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName2: {types.VolumeHead}, - }, - nil) - - // Data keeps intact after the snapshot deletion and volume re-attachment - cksumAfterSnap1, err := util.GetFileChunkChecksum(endpoint, 0, 100*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterSnap1, Equals, cksumBefore1) - cksumAfterSnap2, err := util.GetFileChunkChecksum(endpoint, 200*helpertypes.MiB, 100*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterSnap2, Equals, cksumBefore2) - - // Before testing online rebuilding - // Crash replica2 and remove it from the engine - delete(replicaAddressMap, replicaName2) - err = spdkCli.ReplicaDelete(replicaName2, true) - c.Assert(err, IsNil) - err = spdkCli.EngineReplicaDelete(engineName, replicaName2, net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart)))) - c.Assert(err, IsNil) - engine, err = spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev) - c.Assert(engine.Endpoint, Equals, endpoint) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, map[string]types.Mode{replicaName1: types.ModeRW}) - - // Start testing online rebuilding - // Launch a new replica then ask the engine to rebuild it - replica3, err := spdkCli.ReplicaCreate(replicaName3, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica3.LvsName, Equals, defaultTestDiskName) - c.Assert(replica3.LvsUUID, Equals, disk.Uuid) - c.Assert(replica3.State, Equals, types.InstanceStateRunning) - c.Assert(replica3.PortStart, Not(Equals), int32(0)) - c.Assert(replica3.Head, NotNil) - c.Assert(replica3.Head.CreationTime, Not(Equals), "") - c.Assert(replica3.Head.Parent, Equals, "") - - err = spdkCli.EngineReplicaAdd(engineName, replicaName3, net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart)))) - c.Assert(err, IsNil) - - WaitForReplicaRebuildingComplete(c, spdkCli, engineName, replicaName3) - - // Verify the rebuilding result - replicaAddressMap = map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica3.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart))), - } - engine, err = spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev) - c.Assert(engine.Endpoint, Equals, endpoint) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, map[string]types.Mode{replicaName1: types.ModeRW, replicaName3: types.ModeRW}) - - // The newly rebuilt replica should contain correct data - cksumAfterRebuilding1, err := util.GetFileChunkChecksum(endpoint, 0, 100*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuilding1, Equals, cksumBefore1) - cksumAfterRebuilding2, err := util.GetFileChunkChecksum(endpoint, 200*helpertypes.MiB, 100*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuilding2, Equals, cksumBefore2) - }() - } - - wg.Wait() - - engineList, err := spdkCli.EngineList() - c.Assert(err, IsNil) - for _, engine := range engineList { - err = spdkCli.EngineDelete(engine.Name) - c.Assert(err, IsNil) - } - replicaList, err := spdkCli.ReplicaList() - c.Assert(err, IsNil) - for _, replica := range replicaList { - err = spdkCli.ReplicaDelete(replica.Name, true) - c.Assert(err, IsNil) - } -} - -func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { - fmt.Println("Testing SPDK snapshot operations with multiple threads") - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - concurrentCount := 10 - dataCountInMB := int64(10) - wg := sync.WaitGroup{} - wg.Add(concurrentCount) - for i := 0; i < concurrentCount; i++ { - volumeName := fmt.Sprintf("test-vol-%d", i) - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - replicaName3 := fmt.Sprintf("%s-replica-3", volumeName) - replicaName4 := fmt.Sprintf("%s-replica-4", volumeName) - - go func() { - defer func() { - // Do cleanup - // TODO: Check why there is a race here - // err = spdkCli.EngineDelete(engineName) - // c.Assert(err, IsNil) - // err = spdkCli.ReplicaDelete(replicaName1, true) - // c.Assert(err, IsNil) - // err = spdkCli.ReplicaDelete(replicaName2, true) - // c.Assert(err, IsNil) - // err = spdkCli.ReplicaDelete(replicaName3, true) - // c.Assert(err, IsNil) - - wg.Done() - }() - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica1.LvsName, Equals, defaultTestDiskName) - c.Assert(replica1.LvsUUID, Equals, disk.Uuid) - c.Assert(replica1.State, Equals, types.InstanceStateRunning) - c.Assert(replica1.PortStart, Not(Equals), int32(0)) - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica2.LvsName, Equals, defaultTestDiskName) - c.Assert(replica2.LvsUUID, Equals, disk.Uuid) - c.Assert(replica2.State, Equals, types.InstanceStateRunning) - c.Assert(replica2.PortStart, Not(Equals), int32(0)) - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - replicaModeMap := map[string]types.Mode{ - replica1.Name: types.ModeRW, - replica2.Name: types.ModeRW, - } - endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, replicaModeMap) - c.Assert(engine.Port, Not(Equals), int32(0)) - c.Assert(engine.Endpoint, Equals, endpoint) - - offsetInMB := int64(0) - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore11, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore11, Not(Equals), "") - snapshotName11 := "snap11" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName11) - c.Assert(err, IsNil) - - offsetInMB = dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore12, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore12, Not(Equals), "") - snapshotName12 := "snap12" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName12) - c.Assert(err, IsNil) - - offsetInMB = 2 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore13, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore13, Not(Equals), "") - snapshotName13 := "snap13" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName13) - c.Assert(err, IsNil) - - offsetInMB = 3 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore14, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore14, Not(Equals), "") - snapshotName14 := "snap14" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName14) - c.Assert(err, IsNil) - - offsetInMB = 4 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore15, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore15, Not(Equals), "") - snapshotName15 := "snap15" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName15) - c.Assert(err, IsNil) - - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap12[10,20] -> snap13[20,30] -> snap14[30,40] -> snap15[40,50] -> head[50,60] - - // Write some extra data into the current head before reverting. This part of data will be discarded after revert - offsetInMB = 5 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore16, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName11: {snapshotName12}, - snapshotName12: {snapshotName13}, - snapshotName13: {snapshotName14}, - snapshotName14: {snapshotName15}, - snapshotName15: {types.VolumeHead}, - }, - nil) - - // Revert for a new chain (chain 2) - revertSnapshot(c, spdkCli, snapshotName13, volumeName, engineName, replicaAddressMap) - - // Only the data of snap11, snap12, and snap13 keeps intact after the snapshot deletion and volume re-attachment - offsetInMB = 0 - cksumAfter11, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - offsetInMB = dataCountInMB - cksumAfter12, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter12, Equals, cksumBefore12) - offsetInMB = 2 * dataCountInMB - cksumAfter13, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter13, Equals, cksumBefore13) - // The data of snap14 is no longer there after reverting to snap13 - offsetInMB = 3 * dataCountInMB - cksumAfter14, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter14, Not(Equals), cksumBefore14) - - offsetInMB = 3 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore21, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore21, Not(Equals), "") - snapshotName21 := "snap21" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName21) - c.Assert(err, IsNil) - - offsetInMB = 4 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore22, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore22, Not(Equals), "") - snapshotName22 := "snap22" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName22) - c.Assert(err, IsNil) - - offsetInMB = 5 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore23, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore23, Not(Equals), "") - snapshotName23 := "snap23" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName23) - c.Assert(err, IsNil) - - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap12[10,20] -> snap13[20,30] -> snap14[30,40] -> snap15[40,50] - // \ - // -> snap21[30,40] -> snap22[40,50] -> snap23[50,60] -> head[60,60] - - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName11: {snapshotName12}, - snapshotName12: {snapshotName13}, - snapshotName13: {snapshotName14, snapshotName21}, - snapshotName14: {snapshotName15}, - snapshotName15: {}, - snapshotName21: {snapshotName22}, - snapshotName22: {snapshotName23}, - snapshotName23: {types.VolumeHead}, - }, - nil) - - // Delete some snapshots - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName21) - c.Assert(err, IsNil) - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName22) - c.Assert(err, IsNil) - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName23) - c.Assert(strings.Contains(err.Error(), "since it is the parent of volume head"), Equals, true) - - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName12) - c.Assert(err, IsNil) - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName14) - c.Assert(err, IsNil) - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName13) - c.Assert(strings.Contains(err.Error(), "since it contains multiple children"), Equals, true) - - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap13[10,30] -> snap15[30,50] - // \ - // -> snap23[30,60] -> head[60,60] - - // Verify the data for the current snapshot chain (chain 2) - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - offsetInMB = dataCountInMB - cksumAfter12, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter12, Equals, cksumBefore12) - offsetInMB = 2 * dataCountInMB - cksumAfter13, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter13, Equals, cksumBefore13) - - offsetInMB = 3 * dataCountInMB - cksumAfter21, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter21, Equals, cksumBefore21) - offsetInMB = 4 * dataCountInMB - cksumAfter22, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter22, Equals, cksumBefore22) - offsetInMB = 5 * dataCountInMB - cksumAfter23, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter23, Equals, cksumBefore23) - - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName11: {snapshotName13}, - snapshotName13: {snapshotName15, snapshotName23}, - snapshotName15: {}, - snapshotName23: {types.VolumeHead}, - }, - nil) - - // TODO: Add replica rebuilding related test step - - // Revert for a new chain (chain 3) - revertSnapshot(c, spdkCli, snapshotName11, volumeName, engineName, replicaAddressMap) - - // Create and delete some snapshots for the new chain (chain 3) - offsetInMB = dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore31, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore31, Not(Equals), "") - snapshotName31 := "snap31" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName31) - c.Assert(err, IsNil) - - offsetInMB = 2 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore32, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore32, Not(Equals), "") - snapshotName32 := "snap32" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName32) - c.Assert(err, IsNil) - - err = spdkCli.EngineSnapshotDelete(engineName, snapshotName31) - c.Assert(err, IsNil) - - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap13[10,30] -> snap15[30,50] - // \ \ - // \ -> snap23[30,60] - // \ - // -> snap32[10,30] -> head[30,30] - - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - - offsetInMB = dataCountInMB - cksumAfter31, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter31, Equals, cksumBefore31) - offsetInMB = 2 * dataCountInMB - cksumAfter32, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter32, Equals, cksumBefore32) - - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName11: {snapshotName13, snapshotName32}, - snapshotName13: {snapshotName15, snapshotName23}, - snapshotName15: {}, - snapshotName23: {}, - snapshotName32: {types.VolumeHead}, - }, - nil) - - // Revert for a new chain (chain 4) - revertSnapshot(c, spdkCli, snapshotName11, volumeName, engineName, replicaAddressMap) - - // Create some snapshots for the new chain (chain 4) - offsetInMB = dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore41, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore41, Not(Equals), "") - snapshotName41 := "snap41" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName41) - c.Assert(err, IsNil) - offsetInMB = 2 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBefore42, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBefore42, Not(Equals), "") - snapshotName42 := "snap42" - _, err = spdkCli.EngineSnapshotCreate(engineName, snapshotName42) - c.Assert(err, IsNil) - - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap13[10,30] -> snap15[30,50] - // |\ \ - // | \ -> snap23[30,60] - // | \ - // \ -> snap32[10,30] - // \ - // -> snap41[10,20] -> snap42[20,30] -> head[30,30] - - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName1, replicaName2}, - map[string][]string{ - snapshotName11: {snapshotName13, snapshotName32, snapshotName41}, - snapshotName13: {snapshotName15, snapshotName23}, - snapshotName15: {}, - snapshotName23: {}, - snapshotName32: {}, - snapshotName41: {snapshotName42}, - snapshotName42: {types.VolumeHead}, - }, - nil) - - // Revert back to chain 1 - revertSnapshot(c, spdkCli, snapshotName15, volumeName, engineName, replicaAddressMap) - - // Test online rebuilding twice - - // Write some data to the head before the 1st rebuilding - offsetInMB = 6 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBeforeRebuild11, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBeforeRebuild11, Not(Equals), "") - - // Crash replica1 - delete(replicaAddressMap, replicaName1) - err = spdkCli.ReplicaDelete(replicaName1, true) - c.Assert(err, IsNil) - err = spdkCli.EngineReplicaDelete(engineName, replicaName1, net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart)))) - c.Assert(err, IsNil) - engine, err = spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev) - c.Assert(engine.Endpoint, Equals, endpoint) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, map[string]types.Mode{replicaName2: types.ModeRW}) - // Launch the 1st rebuilding replica as the replacement of the crashed replica1 - replica3, err := spdkCli.ReplicaCreate(replicaName3, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica3.LvsName, Equals, defaultTestDiskName) - c.Assert(replica3.LvsUUID, Equals, disk.Uuid) - c.Assert(replica3.State, Equals, types.InstanceStateRunning) - c.Assert(replica3.PortStart, Not(Equals), int32(0)) - // Start the 1st rebuilding and wait for the completion - err = spdkCli.EngineReplicaAdd(engineName, replicaName3, net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart)))) - c.Assert(err, IsNil) - WaitForReplicaRebuildingComplete(c, spdkCli, engineName, replicaName3) - // While the volume head data written before rebuilding remains - offsetInMB = 6 * dataCountInMB - cksumAfterRebuild11, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuild11, Equals, cksumBeforeRebuild11) - // Figure out the 1st rebuilding snapshot name - replicaAddressMap[replica3.Name] = net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart))) - snapshotNameRebuild1 := "" - for replicaName := range replicaAddressMap { - replica, err := spdkCli.ReplicaGet(replicaName) - c.Assert(err, IsNil) - for snapName, snapLvol := range replica.Snapshots { - if strings.HasPrefix(snapName, server.RebuildingSnapshotNamePrefix) { - c.Assert(snapLvol.Children[types.VolumeHead], Equals, true) - c.Assert(snapLvol.Parent, Equals, snapshotName15) - if snapshotNameRebuild1 == "" { - snapshotNameRebuild1 = snapName - } else { - c.Assert(snapName, Equals, snapshotNameRebuild1) - } - break - } - } - } - c.Assert(snapshotNameRebuild1, Not(Equals), "") - - // Verify the 1st rebuilding result - engine, err = spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev) - c.Assert(engine.Endpoint, Equals, endpoint) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, map[string]types.Mode{replicaName2: types.ModeRW, replicaName3: types.ModeRW}) - - // Rebuilding once leads to 1 snapshot creations (with random name) - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap13[10,30] -> snap15[30,50] -> rebuild11[60,70] -> head[70,70] - // |\ \ - // | \ -> snap23[30,60] - // | \ - // \ -> snap32[10,30] - // \ - // -> snap41[10,20] -> snap42[20,30] - - snapshotMap := map[string][]string{ - snapshotName11: {snapshotName13, snapshotName32, snapshotName41}, - snapshotName13: {snapshotName15, snapshotName23}, - snapshotName15: {snapshotNameRebuild1}, - snapshotNameRebuild1: {types.VolumeHead}, - snapshotName23: {}, - snapshotName32: {}, - snapshotName41: {snapshotName42}, - snapshotName42: {}, - } - snapshotOpts := map[string]api.SnapshotOptions{} - for snapName := range snapshotMap { - snapshotOpts[snapName] = api.SnapshotOptions{UserCreated: true} - } - snapshotOpts[snapshotNameRebuild1] = api.SnapshotOptions{UserCreated: false} - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName2, replicaName3}, snapshotMap, snapshotOpts) - - // Write more data to the head before the 2nd rebuilding - offsetInMB = 7 * dataCountInMB - _, err = ne.Execute(nil, "dd", []string{"if=/dev/urandom", fmt.Sprintf("of=%s", endpoint), "bs=1M", fmt.Sprintf("count=%d", dataCountInMB), fmt.Sprintf("seek=%d", offsetInMB), "status=none"}, defaultTestExecuteTimeout) - c.Assert(err, IsNil) - cksumBeforeRebuild12, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumBeforeRebuild12, Not(Equals), "") - - // Crash replica2 - delete(replicaAddressMap, replicaName2) - err = spdkCli.ReplicaDelete(replicaName2, true) - c.Assert(err, IsNil) - err = spdkCli.EngineReplicaDelete(engineName, replicaName2, net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart)))) - c.Assert(err, IsNil) - engine, err = spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev) - c.Assert(engine.Endpoint, Equals, endpoint) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, map[string]types.Mode{replicaName3: types.ModeRW}) - // Launch the 2nd rebuilding replica as the replacement of the crashed replica2 - replica4, err := spdkCli.ReplicaCreate(replicaName4, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - c.Assert(replica4.LvsName, Equals, defaultTestDiskName) - c.Assert(replica4.LvsUUID, Equals, disk.Uuid) - c.Assert(replica4.State, Equals, types.InstanceStateRunning) - c.Assert(replica4.PortStart, Not(Equals), int32(0)) - // Start the 2nd rebuilding and wait for the completion - err = spdkCli.EngineReplicaAdd(engineName, replicaName4, net.JoinHostPort(ip, strconv.Itoa(int(replica4.PortStart)))) - c.Assert(err, IsNil) - WaitForReplicaRebuildingComplete(c, spdkCli, engineName, replicaName4) - // While the volume head data written before rebuilding remains - offsetInMB = 6 * dataCountInMB - cksumAfterRebuild11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuild11, Equals, cksumBeforeRebuild11) - offsetInMB = 7 * dataCountInMB - cksumAfterRebuild12, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuild12, Equals, cksumBeforeRebuild12) - // Figure out the 2nd rebuilding snapshot name - replicaAddressMap[replica4.Name] = net.JoinHostPort(ip, strconv.Itoa(int(replica4.PortStart))) - snapshotNameRebuild2 := "" - for replicaName := range replicaAddressMap { - replica, err := spdkCli.ReplicaGet(replicaName) - c.Assert(err, IsNil) - for snapName, snapLvol := range replica.Snapshots { - if snapName != snapshotNameRebuild1 && strings.HasPrefix(snapName, server.RebuildingSnapshotNamePrefix) { - c.Assert(snapLvol.Children[types.VolumeHead], Equals, true) - c.Assert(snapLvol.Parent, Equals, snapshotNameRebuild1) - if snapshotNameRebuild2 == "" { - snapshotNameRebuild2 = snapName - } else { - c.Assert(snapName, Equals, snapshotNameRebuild2) - } - break - } - } - } - c.Assert(snapshotNameRebuild2, Not(Equals), "") - - // Verify the 2nd rebuilding result - engine, err = spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev) - c.Assert(engine.Endpoint, Equals, endpoint) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.ReplicaModeMap, DeepEquals, map[string]types.Mode{replicaName3: types.ModeRW, replicaName4: types.ModeRW}) - - // Rebuilding twice leads to 2 snapshot creations (with random name) - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap13[10,30] -> snap15[30,50] -> rebuild11[60,70] -> rebuild12[70,80] -> head[80,80] - // |\ \ - // | \ -> snap23[30,60] - // | \ - // \ -> snap32[10,30] - // \ - // -> snap41[10,20] -> snap42[20,30] - snapshotMap = map[string][]string{ - snapshotName11: {snapshotName13, snapshotName32, snapshotName41}, - snapshotName13: {snapshotName15, snapshotName23}, - snapshotName15: {snapshotNameRebuild1}, - snapshotNameRebuild1: {snapshotNameRebuild2}, - snapshotNameRebuild2: {types.VolumeHead}, - snapshotName23: {}, - snapshotName32: {}, - snapshotName41: {snapshotName42}, - snapshotName42: {}, - } - snapshotOpts[snapshotNameRebuild2] = api.SnapshotOptions{UserCreated: false} - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName3, replicaName4}, snapshotMap, snapshotOpts) - - // Purging snapshots would lead to rebuild11 cleanup - err = spdkCli.EngineSnapshotPurge(engineName) - c.Assert(err, IsNil) - - // Current snapshot tree (with backing image): - // nil (backing image) -> snap11[0,10] -> snap13[10,30] -> snap15[30,50] -> rebuild12[60,80] -> head[80,80] - // |\ \ - // | \ -> snap23[30,60] - // | \ - // \ -> snap32[10,30] - // \ - // -> snap41[10,20] -> snap42[20,30] - snapshotMap = map[string][]string{ - snapshotName11: {snapshotName13, snapshotName32, snapshotName41}, - snapshotName13: {snapshotName15, snapshotName23}, - snapshotName15: {snapshotNameRebuild2}, - snapshotNameRebuild2: {types.VolumeHead}, - snapshotName23: {}, - snapshotName32: {}, - snapshotName41: {snapshotName42}, - snapshotName42: {}, - } - delete(snapshotOpts, snapshotNameRebuild1) - checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName3, replicaName4}, snapshotMap, snapshotOpts) - for _, replicaName := range []string{replicaName3, replicaName4} { - replica, err := spdkCli.ReplicaGet(replicaName) - c.Assert(err, IsNil) - c.Assert(replica.Head.Parent, Equals, snapshotNameRebuild2) - c.Assert(replica.Head.ActualSize, Equals, uint64(0)) - } - - // The newly rebuilt replicas should contain correct/unchanged data - // Verify chain1 - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - offsetInMB = dataCountInMB - cksumAfter12, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter12, Equals, cksumBefore12) - offsetInMB = 2 * dataCountInMB - cksumAfter13, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter13, Equals, cksumBefore13) - offsetInMB = 3 * dataCountInMB - cksumAfter14, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter14, Equals, cksumBefore14) - offsetInMB = 4 * dataCountInMB - cksumAfter15, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter15, Equals, cksumBefore15) - // Notice that the head before the first revert is discarded - offsetInMB = 5 * dataCountInMB - cksumAfter16, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter16, Not(Equals), cksumBefore16) - // While the volume head data after rebuilding and purge still remains - offsetInMB = 6 * dataCountInMB - cksumAfterRebuild11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuild11, Equals, cksumBeforeRebuild11) - offsetInMB = 7 * dataCountInMB - cksumAfterRebuild12, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfterRebuild12, Equals, cksumBeforeRebuild12) - // Verify chain2 - revertSnapshot(c, spdkCli, snapshotName23, volumeName, engineName, replicaAddressMap) - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - offsetInMB = dataCountInMB - cksumAfter12, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter12, Equals, cksumBefore12) - offsetInMB = 2 * dataCountInMB - cksumAfter13, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter13, Equals, cksumBefore13) - offsetInMB = 3 * dataCountInMB - cksumAfter21, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter21, Equals, cksumBefore21) - offsetInMB = 4 * dataCountInMB - cksumAfter22, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter22, Equals, cksumBefore22) - offsetInMB = 5 * dataCountInMB - cksumAfter23, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter23, Equals, cksumBefore23) - // Verify chain3 - revertSnapshot(c, spdkCli, snapshotName32, volumeName, engineName, replicaAddressMap) - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - offsetInMB = dataCountInMB - cksumAfter31, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter31, Equals, cksumBefore31) - offsetInMB = 2 * dataCountInMB - cksumAfter32, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter32, Equals, cksumBefore32) - // Verify chain4 - revertSnapshot(c, spdkCli, snapshotName42, volumeName, engineName, replicaAddressMap) - offsetInMB = 0 - cksumAfter11, err = util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter11, Equals, cksumBefore11) - offsetInMB = dataCountInMB - cksumAfter41, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter41, Equals, cksumBefore41) - offsetInMB = 2 * dataCountInMB - cksumAfter42, err := util.GetFileChunkChecksum(endpoint, offsetInMB*helpertypes.MiB, dataCountInMB*helpertypes.MiB) - c.Assert(err, IsNil) - c.Assert(cksumAfter42, Equals, cksumBefore42) - }() - } - - wg.Wait() - - engineList, err := spdkCli.EngineList() - c.Assert(err, IsNil) - for _, engine := range engineList { - err = spdkCli.EngineDelete(engine.Name) - c.Assert(err, IsNil) - } - replicaList, err := spdkCli.ReplicaList() - c.Assert(err, IsNil) - for _, replica := range replicaList { - err = spdkCli.ReplicaDelete(replica.Name, true) - c.Assert(err, IsNil) - } -} - -func checkReplicaSnapshots(c *C, spdkCli *client.SPDKClient, engineName string, replicaList []string, snapshotMap map[string][]string, snapshotOpts map[string]api.SnapshotOptions) { - engine, err := spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - - for _, replicaName := range replicaList { - replica, err := spdkCli.ReplicaGet(replicaName) - c.Assert(err, IsNil) - c.Assert(len(replica.Snapshots), Equals, len(snapshotMap)) - - for snapName, childrenList := range snapshotMap { - snap := replica.Snapshots[snapName] - c.Assert(snap, NotNil) - c.Assert(engine.Snapshots[snapName], NotNil) - c.Assert(engine.Snapshots[snapName].Children, DeepEquals, snap.Children) - - for _, childSnapName := range childrenList { - c.Assert(snap.Children[childSnapName], Equals, true) - if childSnapName != types.VolumeHead { - childSnap := replica.Snapshots[childSnapName] - c.Assert(childSnap, NotNil) - c.Assert(childSnap.Parent, Equals, snapName) - } - } - } - - for snapName, opts := range snapshotOpts { - snap := replica.Snapshots[snapName] - c.Assert(snap, NotNil) - c.Assert(engine.Snapshots[snapName].UserCreated, Equals, opts.UserCreated) - } - - } -} - -func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, engineName string, replicaAddressMap map[string]string) { - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - engine, err := spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - - if engine.State != types.InstanceStateRunning { - return - } - - prevFrontend := engine.Frontend - prevEndpoint := engine.Endpoint - if prevFrontend != types.FrontendEmpty { - // Restart the engine without the frontend - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.Port, Equals, int32(0)) - c.Assert(engine.Endpoint, Equals, "") - } - - err = spdkCli.EngineSnapshotRevert(engineName, snapshotName) - c.Assert(err, IsNil) - - if prevFrontend != types.FrontendEmpty { - // Restart the engine with the previous frontend - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) - c.Assert(engine.Port, Not(Equals), int32(0)) - c.Assert(engine.Endpoint, Equals, prevEndpoint) - } -} - -func WaitForReplicaRebuildingComplete(c *C, spdkCli *client.SPDKClient, engineName, replicaName string) { - complete := false - - for cnt := 0; cnt < defaultTestRebuildingWaitCount; cnt++ { - rebuildingStatus, err := spdkCli.ReplicaRebuildingDstShallowCopyCheck(replicaName) - c.Assert(err, IsNil) - c.Assert(rebuildingStatus.Error, Equals, "") - switch rebuildingStatus.State { - case "": - c.Assert(rebuildingStatus.SnapshotName, Equals, "") - c.Assert(rebuildingStatus.TotalState, Equals, "") - c.Assert(rebuildingStatus.Progress, Equals, uint32(0)) - c.Assert(rebuildingStatus.TotalProgress, Equals, uint32(0)) - c.Assert(rebuildingStatus.TotalState, Equals, "") - case types.ProgressStateStarting: - c.Assert(rebuildingStatus.SnapshotName, Equals, "") - c.Assert(rebuildingStatus.TotalState, Equals, "") - c.Assert(rebuildingStatus.Progress, Equals, uint32(0)) - c.Assert(rebuildingStatus.TotalProgress, Equals, uint32(0)) - c.Assert(rebuildingStatus.TotalState, Equals, types.ProgressStateInProgress) - case types.ProgressStateInProgress: - c.Assert(rebuildingStatus.SnapshotName, Not(Equals), "") - c.Assert(rebuildingStatus.TotalState, Equals, types.ProgressStateInProgress) - c.Assert(rebuildingStatus.Progress <= 100, Equals, true) - c.Assert(rebuildingStatus.TotalProgress < 100, Equals, true) - case types.ProgressStateComplete: - c.Assert(rebuildingStatus.Progress, Equals, uint32(100)) - if rebuildingStatus.TotalState == types.ProgressStateInProgress { - c.Assert(rebuildingStatus.TotalProgress <= 100, Equals, true) - } else { - c.Assert(rebuildingStatus.TotalState, Equals, types.ProgressStateComplete) - c.Assert(rebuildingStatus.TotalProgress, Equals, uint32(100)) - } - default: - c.Fatalf("Unexpected rebuilding state %v", rebuildingStatus.State) - } - - if rebuildingStatus.TotalState == types.ProgressStateComplete { - engine, err := spdkCli.EngineGet(engineName) - c.Assert(err, IsNil) - c.Assert(engine.State, Equals, types.InstanceStateRunning) - if engine.ReplicaModeMap[replicaName] == types.ModeRW { - complete = true - break - } - } - - time.Sleep(defaultTestRebuildingWaitInterval) - } - - c.Assert(complete, Equals, true) -} - func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { fmt.Println("Testing SPDK basic operations with engine only with target") @@ -1351,7 +222,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1376,76 +247,76 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { c.Assert(err, IsNil) } -func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { - fmt.Println("Testing SPDK Engine Creation with Upgrade Required") - - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - volumeName := "test-vol" - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) - c.Assert(err, IsNil) - - targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) - c.Assert(err, IsNil) - c.Assert(engine.Endpoint, Not(Equals), "") - // Initiator is not created, so the IP and Port should be empty - c.Assert(engine.IP, Equals, ip) - c.Assert(engine.Port, Not(Equals), int32(0)) - // Target is created and exposed - c.Assert(engine.TargetIP, Equals, ip) - c.Assert(engine.TargetPort, Not(Equals), int32(0)) - c.Assert(engine.Port, Equals, engine.TargetPort) - - // Tear down engine and replicas - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - - err = spdkCli.ReplicaDelete(replicaName1, false) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName2, false) - c.Assert(err, IsNil) -} +// func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { +// fmt.Println("Testing SPDK Engine Creation with Upgrade Required") + +// diskDriverName := "aio" + +// ip, err := commonnet.GetAnyExternalIP() +// c.Assert(err, IsNil) +// os.Setenv(commonnet.EnvPodIP, ip) + +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() + +// ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) +// c.Assert(err, IsNil) +// LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) + +// loopDevicePath := PrepareDiskFile(c) +// defer func() { +// CleanupDiskFile(c, loopDevicePath) +// }() + +// spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) +// c.Assert(err, IsNil) + +// disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) +// c.Assert(err, IsNil) +// c.Assert(disk.Path, Equals, loopDevicePath) +// c.Assert(disk.Uuid, Not(Equals), "") + +// defer func() { +// err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) +// c.Assert(err, IsNil) +// }() + +// volumeName := "test-vol" +// engineName := fmt.Sprintf("%s-engine", volumeName) +// replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) +// replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) + +// replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) +// c.Assert(err, IsNil) +// replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) +// c.Assert(err, IsNil) + +// replicaAddressMap := map[string]string{ +// replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), +// replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), +// } + +// engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) +// c.Assert(err, IsNil) + +// targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) +// engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, false) +// c.Assert(err, IsNil) +// c.Assert(engine.Endpoint, Not(Equals), "") +// // Initiator is not created, so the IP and Port should be empty +// c.Assert(engine.IP, Equals, ip) +// c.Assert(engine.Port, Not(Equals), int32(0)) +// // Target is created and exposed +// c.Assert(engine.TargetIP, Equals, ip) +// c.Assert(engine.TargetPort, Not(Equals), int32(0)) +// c.Assert(engine.Port, Equals, engine.TargetPort) + +// // Tear down engine and replicas +// err = spdkCli.EngineDelete(engineName) +// c.Assert(err, IsNil) + +// err = spdkCli.ReplicaDelete(replicaName1, false) +// c.Assert(err, IsNil) +// err = spdkCli.ReplicaDelete(replicaName2, false) +// c.Assert(err, IsNil) +// }