From 9d63fc8cf4c14be8a4b9b4fe483a0f86a7def666 Mon Sep 17 00:00:00 2001 From: Raphanus Lo Date: Mon, 30 Dec 2024 17:25:39 +0800 Subject: [PATCH] fix(env-check): remove monitor initial pending to fix disk mismatch problem longhorn/longhorn-10098 Signed-off-by: Raphanus Lo --- controller/monitor/disk_monitor.go | 70 +++++++++------- .../monitor/environment_check_monitor.go | 80 +++++++++++-------- controller/monitor/fake_disk_monitor.go | 4 +- .../monitor/fake_environment_check_monitor.go | 10 +-- controller/monitor/monitor.go | 4 +- controller/monitor/snapshot_monitor.go | 71 ++++++++-------- controller/node_controller.go | 46 +++++------ 7 files changed, 153 insertions(+), 132 deletions(-) diff --git a/controller/monitor/disk_monitor.go b/controller/monitor/disk_monitor.go index c6511f016d..0c7f4330b6 100644 --- a/controller/monitor/disk_monitor.go +++ b/controller/monitor/disk_monitor.go @@ -35,12 +35,17 @@ type DiskServiceClient struct { err error } -type DiskMonitor struct { +type DiskMonitor Monitor[map[string]*CollectedDiskInfo] + +var _ DiskMonitor = &diskMonitorImpl{} + +type diskMonitorImpl struct { *baseMonitor nodeName string checkVolumeMeta bool + startOnce sync.Once collectedDataLock sync.RWMutex collectedData map[string]*CollectedDiskInfo @@ -69,10 +74,10 @@ type GetDiskConfigHandler func(longhorn.DiskType, string, string, longhorn.DiskD type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, string, *DiskServiceClient) (*util.DiskConfig, error) type GetReplicaDataStoresHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, string, *DiskServiceClient) (map[string]string, error) -func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) { +func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (DiskMonitor, error) { ctx, quit := context.WithCancel(context.Background()) - m := &DiskMonitor{ + m := &diskMonitorImpl{ baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod), nodeName: nodeName, @@ -89,39 +94,44 @@ func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName getReplicaDataStoresHandler: getReplicaDataStores, } - go m.Start() - return m, nil } -func (m *DiskMonitor) Start() { - if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) { - if err := m.run(struct{}{}); err != nil { - m.logger.WithError(err).Error("Stopped monitoring disks") +func (m *diskMonitorImpl) Start() { + m.startOnce.Do(func() { + if err := m.run(); err != nil { + m.logger.WithError(err).Error("Initial disk monitoring failed") } - return false, nil - }); err != nil { - if errors.Is(err, context.Canceled) { - m.logger.WithError(err).Warning("Disk monitor is stopped") - } else { - m.logger.WithError(err).Error("Failed to start disk monitor") - } - } + go func() { + if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) { + if err := m.run(); err != nil { + m.logger.WithError(err).Error("Stopped monitoring disks") + } + return false, nil + }); err != nil { + if errors.Is(err, context.Canceled) { + m.logger.WithError(err).Warning("Disk monitor is stopped") + } else { + m.logger.WithError(err).Error("Failed to start disk monitor") + } + } + }() + }) } -func (m *DiskMonitor) Stop() { +func (m *diskMonitorImpl) Stop() { m.quit() } -func (m *DiskMonitor) RunOnce() error { - return m.run(struct{}{}) +func (m *diskMonitorImpl) RunOnce() error { + return m.run() } -func (m *DiskMonitor) UpdateConfiguration(map[string]interface{}) error { +func (m *diskMonitorImpl) UpdateConfiguration(map[string]interface{}) error { return nil } -func (m *DiskMonitor) GetCollectedData() (interface{}, error) { +func (m *diskMonitorImpl) GetCollectedData() (map[string]*CollectedDiskInfo, error) { m.collectedDataLock.RLock() defer m.collectedDataLock.RUnlock() @@ -133,7 +143,7 @@ func (m *DiskMonitor) GetCollectedData() (interface{}, error) { return data, nil } -func (m *DiskMonitor) run(value interface{}) error { +func (m *diskMonitorImpl) run() error { node, err := m.ds.GetNode(m.nodeName) if err != nil { return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName) @@ -154,7 +164,7 @@ func (m *DiskMonitor) run(value interface{}) error { return nil } -func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) { +func (m *diskMonitorImpl) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) { switch dataEngine { case longhorn.DataEngineTypeV1: return m.ds.GetDefaultInstanceManagerByNodeRO(m.nodeName, dataEngine) @@ -180,7 +190,7 @@ func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine return nil, fmt.Errorf("unknown data engine %v", dataEngine) } -func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient { +func (m *diskMonitorImpl) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient { clients := map[longhorn.DataEngineType]*DiskServiceClient{} dataEngines := m.ds.GetDataEngines() @@ -207,7 +217,7 @@ func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskS return clients } -func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) { +func (m *diskMonitorImpl) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) { for _, client := range clients { if client.c != nil { client.c.Close() @@ -217,7 +227,7 @@ func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp } // Collect disk data and generate disk UUID blindly. -func (m *DiskMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo { +func (m *diskMonitorImpl) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo { diskInfoMap := make(map[string]*CollectedDiskInfo, 0) diskServiceClients := m.newDiskServiceClients() @@ -392,7 +402,7 @@ func NewDiskInfo(diskName, diskUUID, diskPath string, diskDriver longhorn.DiskDr return diskInfo } -func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { +func (m *diskMonitorImpl) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { switch diskType { case longhorn.DiskTypeFilesystem: return m.getOrphanedReplicaDirectoryNames(diskUUID, diskPath, replicaDataStores) @@ -403,7 +413,7 @@ func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, d } } -func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) { +func (m *diskMonitorImpl) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) { if len(replicaDataStores) == 0 { return map[string]string{}, nil } @@ -418,7 +428,7 @@ func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]s return replicaDataStores, nil } -func (m *DiskMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { +func (m *diskMonitorImpl) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { if len(replicaDataStores) == 0 { return map[string]string{}, nil } diff --git a/controller/monitor/environment_check_monitor.go b/controller/monitor/environment_check_monitor.go index 538d2dd32b..28ea2f6f5d 100644 --- a/controller/monitor/environment_check_monitor.go +++ b/controller/monitor/environment_check_monitor.go @@ -45,11 +45,16 @@ var ( nfsProtocolVersions = map[string]bool{"4.0": true, "4.1": true, "4.2": true} ) -type EnvironmentCheckMonitor struct { +type EnvironmentCheckMonitor Monitor[[]longhorn.Condition] + +var _ EnvironmentCheckMonitor = &environmentCheckMonitorImpl{} + +type environmentCheckMonitorImpl struct { *baseMonitor nodeName string + startOnce sync.Once collectedDataLock sync.RWMutex collectedData *CollectedEnvironmentCheckInfo @@ -60,10 +65,10 @@ type CollectedEnvironmentCheckInfo struct { conditions []longhorn.Condition } -func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*EnvironmentCheckMonitor, error) { +func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (EnvironmentCheckMonitor, error) { ctx, quit := context.WithCancel(context.Background()) - m := &EnvironmentCheckMonitor{ + m := &environmentCheckMonitorImpl{ baseMonitor: newBaseMonitor(ctx, quit, logger, ds, EnvironmentCheckMonitorSyncPeriod), nodeName: nodeName, @@ -74,39 +79,44 @@ func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataSto syncCallback: syncCallback, } - go m.Start() - return m, nil } -func (m *EnvironmentCheckMonitor) Start() { - if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) { - if err := m.run(struct{}{}); err != nil { - m.logger.WithError(err).Error("Stopped monitoring environment check") +func (m *environmentCheckMonitorImpl) Start() { + m.startOnce.Do(func() { + if err := m.run(); err != nil { + m.logger.WithError(err).Error("Initial environment monitoring failed") } - return false, nil - }); err != nil { - if errors.Is(err, context.Canceled) { - m.logger.WithError(err).Warning("Environment check monitor is stopped") - } else { - m.logger.WithError(err).Error("Failed to start environment check monitor") - } - } + go func() { + if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) { + if err := m.run(); err != nil { + m.logger.WithError(err).Error("Stopped monitoring environment check") + } + return false, nil + }); err != nil { + if errors.Is(err, context.Canceled) { + m.logger.WithError(err).Warning("Environment check monitor is stopped") + } else { + m.logger.WithError(err).Error("Failed to start environment check monitor") + } + } + }() + }) } -func (m *EnvironmentCheckMonitor) Stop() { +func (m *environmentCheckMonitorImpl) Stop() { m.quit() } -func (m *EnvironmentCheckMonitor) RunOnce() error { - return m.run(struct{}{}) +func (m *environmentCheckMonitorImpl) RunOnce() error { + return m.run() } -func (m *EnvironmentCheckMonitor) UpdateConfiguration(map[string]interface{}) error { +func (m *environmentCheckMonitorImpl) UpdateConfiguration(map[string]interface{}) error { return nil } -func (m *EnvironmentCheckMonitor) GetCollectedData() (interface{}, error) { +func (m *environmentCheckMonitorImpl) GetCollectedData() ([]longhorn.Condition, error) { m.collectedDataLock.RLock() defer m.collectedDataLock.RUnlock() @@ -118,7 +128,7 @@ func (m *EnvironmentCheckMonitor) GetCollectedData() (interface{}, error) { return data, nil } -func (m *EnvironmentCheckMonitor) run(value interface{}) error { +func (m *environmentCheckMonitorImpl) run() error { node, err := m.ds.GetNode(m.nodeName) if err != nil { return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName) @@ -139,7 +149,7 @@ func (m *EnvironmentCheckMonitor) run(value interface{}) error { return nil } -func (m *EnvironmentCheckMonitor) collectEnvironmentCheckData(node *longhorn.Node) *CollectedEnvironmentCheckInfo { +func (m *environmentCheckMonitorImpl) collectEnvironmentCheckData(node *longhorn.Node) *CollectedEnvironmentCheckInfo { kubeNode, err := m.ds.GetKubernetesNodeRO(node.Name) if err != nil { return &CollectedEnvironmentCheckInfo{ @@ -150,7 +160,7 @@ func (m *EnvironmentCheckMonitor) collectEnvironmentCheckData(node *longhorn.Nod return m.environmentCheck(kubeNode) } -func (m *EnvironmentCheckMonitor) environmentCheck(kubeNode *corev1.Node) *CollectedEnvironmentCheckInfo { +func (m *environmentCheckMonitorImpl) environmentCheck(kubeNode *corev1.Node) *CollectedEnvironmentCheckInfo { collectedData := &CollectedEnvironmentCheckInfo{ conditions: []longhorn.Condition{}, } @@ -176,7 +186,7 @@ func (m *EnvironmentCheckMonitor) environmentCheck(kubeNode *corev1.Node) *Colle return collectedData } -func (m *EnvironmentCheckMonitor) syncPackagesInstalled(kubeNode *corev1.Node, namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) { +func (m *environmentCheckMonitorImpl) syncPackagesInstalled(kubeNode *corev1.Node, namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) { osImage := strings.ToLower(kubeNode.Status.NodeInfo.OSImage) packageProbeExecutables := make(map[string]string) @@ -244,7 +254,7 @@ func (m *EnvironmentCheckMonitor) syncPackagesInstalled(kubeNode *corev1.Node, n fmt.Sprintf("All required packages %v are installed", installedPackages)) } -func (m *EnvironmentCheckMonitor) syncPackagesInstalledTalosLinux(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) { +func (m *environmentCheckMonitorImpl) syncPackagesInstalledTalosLinux(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) { type validateCommand struct { binary string args []string @@ -320,7 +330,7 @@ func (m *EnvironmentCheckMonitor) syncPackagesInstalledTalosLinux(namespaces []l } } -func (m *EnvironmentCheckMonitor) syncMultipathd(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) { +func (m *environmentCheckMonitorImpl) syncMultipathd(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) { nsexec, err := lhns.NewNamespaceExecutor(lhtypes.ProcessNone, lhtypes.HostProcDirectory, namespaces) if err != nil { collectedData.conditions = types.SetCondition(collectedData.conditions, longhorn.NodeConditionTypeMultipathd, longhorn.ConditionStatusFalse, @@ -339,7 +349,7 @@ func (m *EnvironmentCheckMonitor) syncMultipathd(namespaces []lhtypes.Namespace, collectedData.conditions = types.SetCondition(collectedData.conditions, longhorn.NodeConditionTypeMultipathd, longhorn.ConditionStatusTrue, "", "") } -func (m *EnvironmentCheckMonitor) checkPackageInstalled(packageProbeExecutables map[string]string, namespaces []lhtypes.Namespace) (installed, notInstalled []string, err error) { +func (m *environmentCheckMonitorImpl) checkPackageInstalled(packageProbeExecutables map[string]string, namespaces []lhtypes.Namespace) (installed, notInstalled []string, err error) { nsexec, err := lhns.NewNamespaceExecutor(lhtypes.ProcessNone, lhtypes.HostProcDirectory, namespaces) if err != nil { return nil, nil, err @@ -357,7 +367,7 @@ func (m *EnvironmentCheckMonitor) checkPackageInstalled(packageProbeExecutables return installed, notInstalled, nil } -func (m *EnvironmentCheckMonitor) checkHugePages(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) { +func (m *environmentCheckMonitorImpl) checkHugePages(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) { hugePageLimitInMiB, err := m.ds.GetSettingAsInt(types.SettingNameV2DataEngineHugepageLimit) if err != nil { m.logger.Debugf("Failed to fetch v2-data-engine-hugepage-limit setting, using default value: %d", 2048) @@ -391,7 +401,7 @@ func (m *EnvironmentCheckMonitor) checkHugePages(kubeNode *corev1.Node, collecte ) } -func (m *EnvironmentCheckMonitor) checkKernelModulesLoaded(kubeNode *corev1.Node, isV2DataEngine bool, collectedData *CollectedEnvironmentCheckInfo) { +func (m *environmentCheckMonitorImpl) checkKernelModulesLoaded(kubeNode *corev1.Node, isV2DataEngine bool, collectedData *CollectedEnvironmentCheckInfo) { modulesToCheck := make(map[string]string) for k, v := range kernelModules { modulesToCheck[k] = v @@ -452,7 +462,7 @@ func checkModulesLoadedUsingkmod(modules map[string]string) (map[string]string, return notFoundModules, nil } -func (m *EnvironmentCheckMonitor) checkModulesLoadedByConfigFile(modules map[string]string, kernelVersion string) ([]string, error) { +func (m *environmentCheckMonitorImpl) checkModulesLoadedByConfigFile(modules map[string]string, kernelVersion string) ([]string, error) { kernelConfigMap, err := lhsys.GetBootKernelConfigMap(kernelConfigDir, kernelVersion) if err != nil { if kernelConfigMap, err = lhsys.GetProcKernelConfigMap(lhtypes.HostProcDirectory); err != nil { @@ -474,7 +484,7 @@ func (m *EnvironmentCheckMonitor) checkModulesLoadedByConfigFile(modules map[str return notLoadedModules, nil } -func (m *EnvironmentCheckMonitor) checkNFSMountConfigFile(supported map[string]bool, configFilePathPrefix string) (actualDefaultVer string, isAllowed bool, err error) { +func (m *environmentCheckMonitorImpl) checkNFSMountConfigFile(supported map[string]bool, configFilePathPrefix string) (actualDefaultVer string, isAllowed bool, err error) { var nfsVer string nfsMajor, nfsMinor, err := lhnfs.GetSystemDefaultNFSVersion(configFilePathPrefix) if err == nil { @@ -490,7 +500,7 @@ func (m *EnvironmentCheckMonitor) checkNFSMountConfigFile(supported map[string]b return actualDefaultVer, supported[nfsVer], nil } -func (m *EnvironmentCheckMonitor) checkKernelModuleEnabled(module, kmodName string, kernelConfigMap map[string]string) (bool, error) { +func (m *environmentCheckMonitorImpl) checkKernelModuleEnabled(module, kmodName string, kernelConfigMap map[string]string) (bool, error) { enabled, exists := kernelConfigMap[module] if !exists { return false, nil @@ -526,7 +536,7 @@ func getModulesConfigsList(modulesMap map[string]string, needModules bool) []str return modulesConfigs } -func (m *EnvironmentCheckMonitor) syncNFSClientVersion(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) { +func (m *environmentCheckMonitorImpl) syncNFSClientVersion(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) { notLoadedModules, err := m.checkModulesLoadedByConfigFile(nfsClientVersions, kubeNode.Status.NodeInfo.KernelVersion) if err != nil { collectedData.conditions = types.SetCondition(collectedData.conditions, longhorn.NodeConditionTypeNFSClientInstalled, longhorn.ConditionStatusFalse, diff --git a/controller/monitor/fake_disk_monitor.go b/controller/monitor/fake_disk_monitor.go index 6f55c51c03..4061cc35e9 100644 --- a/controller/monitor/fake_disk_monitor.go +++ b/controller/monitor/fake_disk_monitor.go @@ -21,10 +21,10 @@ const ( TestOrphanedReplicaDirectoryName = "test-volume-r-000000000" ) -func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) { +func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (DiskMonitor, error) { ctx, quit := context.WithCancel(context.Background()) - m := &DiskMonitor{ + m := &diskMonitorImpl{ baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod), nodeName: nodeName, diff --git a/controller/monitor/fake_environment_check_monitor.go b/controller/monitor/fake_environment_check_monitor.go index 75656b90ff..2adc298aa1 100644 --- a/controller/monitor/fake_environment_check_monitor.go +++ b/controller/monitor/fake_environment_check_monitor.go @@ -26,7 +26,7 @@ type FakeEnvironmentCheckMonitor struct { syncCallback func(key string) } -func NewFakeEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*FakeEnvironmentCheckMonitor, error) { +func NewFakeEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (EnvironmentCheckMonitor, error) { ctx, quit := context.WithCancel(context.Background()) m := &FakeEnvironmentCheckMonitor{ @@ -45,7 +45,7 @@ func NewFakeEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.Dat func (m *FakeEnvironmentCheckMonitor) Start() { if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) { - if err := m.run(struct{}{}); err != nil { + if err := m.run(); err != nil { m.logger.WithError(err).Error("Stopped monitoring environment check") } return false, nil @@ -63,14 +63,14 @@ func (m *FakeEnvironmentCheckMonitor) Stop() { } func (m *FakeEnvironmentCheckMonitor) RunOnce() error { - return m.run(struct{}{}) + return m.run() } func (m *FakeEnvironmentCheckMonitor) UpdateConfiguration(map[string]interface{}) error { return nil } -func (m *FakeEnvironmentCheckMonitor) GetCollectedData() (interface{}, error) { +func (m *FakeEnvironmentCheckMonitor) GetCollectedData() ([]longhorn.Condition, error) { m.collectedDataLock.RLock() defer m.collectedDataLock.RUnlock() @@ -82,7 +82,7 @@ func (m *FakeEnvironmentCheckMonitor) GetCollectedData() (interface{}, error) { return data, nil } -func (m *FakeEnvironmentCheckMonitor) run(value interface{}) error { +func (m *FakeEnvironmentCheckMonitor) run() error { node, err := m.ds.GetNode(m.nodeName) if err != nil { return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName) diff --git a/controller/monitor/monitor.go b/controller/monitor/monitor.go index fbcc2df34d..f4f14d6e6a 100644 --- a/controller/monitor/monitor.go +++ b/controller/monitor/monitor.go @@ -9,11 +9,11 @@ import ( "github.com/longhorn/longhorn-manager/datastore" ) -type Monitor interface { +type Monitor[CollectedDataType any] interface { Start() Stop() UpdateConfiguration(map[string]interface{}) error - GetCollectedData() (interface{}, error) + GetCollectedData() (CollectedDataType, error) RunOnce() error } diff --git a/controller/monitor/snapshot_monitor.go b/controller/monitor/snapshot_monitor.go index c1e99b6fc8..08d36b3d8a 100644 --- a/controller/monitor/snapshot_monitor.go +++ b/controller/monitor/snapshot_monitor.go @@ -61,7 +61,11 @@ type SnapshotMonitorStatus struct { LastSnapshotPeriodicCheckedAt metav1.Time } -type SnapshotMonitor struct { +type SnapshotMonitor Monitor[SnapshotMonitorStatus] + +var _ SnapshotMonitor = &snapshotMonitorImpl{} + +type snapshotMonitorImpl struct { sync.RWMutex *baseMonitor @@ -73,6 +77,7 @@ type SnapshotMonitor struct { snapshotChangeEventQueue workqueue.TypedInterface[any] snapshotCheckTaskQueue workqueue.TypedRateLimitingInterface[any] + startOnce sync.Once inProgressSnapshotCheckTasks map[string]struct{} inProgressSnapshotCheckTasksLock sync.RWMutex @@ -86,11 +91,11 @@ type SnapshotMonitor struct { } func NewSnapshotMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, eventRecorder record.EventRecorder, - snapshotChangeEventQueue workqueue.TypedInterface[any], syncCallback func(key string)) (*SnapshotMonitor, error) { + snapshotChangeEventQueue workqueue.TypedInterface[any], syncCallback func(key string)) (*snapshotMonitorImpl, error) { ctx, quit := context.WithCancel(context.Background()) - m := &SnapshotMonitor{ + m := &snapshotMonitorImpl{ baseMonitor: newBaseMonitor(ctx, quit, logger, ds, 0), nodeName: nodeName, @@ -114,20 +119,20 @@ func NewSnapshotMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, node m.checkScheduler.SingletonModeAll() - go m.Start() - return m, nil } -func (m *SnapshotMonitor) Start() { - for i := 0; i < snapshotCheckWorkerMax; i++ { - go m.snapshotCheckWorker(i) - } +func (m *snapshotMonitorImpl) Start() { + m.startOnce.Do(func() { + for i := 0; i < snapshotCheckWorkerMax; i++ { + go m.snapshotCheckWorker(i) + } - go m.processSnapshotChangeEvent() + go m.processSnapshotChangeEvent() + }) } -func (m *SnapshotMonitor) processNextEvent() bool { +func (m *snapshotMonitorImpl) processNextEvent() bool { key, quit := m.snapshotChangeEventQueue.Get() if quit { return false @@ -145,12 +150,12 @@ func (m *SnapshotMonitor) processNextEvent() bool { return true } -func (m *SnapshotMonitor) processSnapshotChangeEvent() { +func (m *snapshotMonitorImpl) processSnapshotChangeEvent() { for m.processNextEvent() { } } -func (m *SnapshotMonitor) checkSnapshots() { +func (m *snapshotMonitorImpl) checkSnapshots() { m.logger.WithField("monitor", monitorName).Info("Starting checking snapshots") defer m.logger.WithField("monitor", monitorName).Infof("Finished checking snapshots") @@ -172,7 +177,7 @@ func (m *SnapshotMonitor) checkSnapshots() { } } -func (m *SnapshotMonitor) populateEngineSnapshots(engine *longhorn.Engine) { +func (m *snapshotMonitorImpl) populateEngineSnapshots(engine *longhorn.Engine) { snapshots := engine.Status.Snapshots for _, snapshot := range snapshots { // Skip volume-head because it is not a real snapshot. @@ -190,7 +195,7 @@ func (m *SnapshotMonitor) populateEngineSnapshots(engine *longhorn.Engine) { } } -func (m *SnapshotMonitor) processNextWorkItem(id int) bool { +func (m *snapshotMonitorImpl) processNextWorkItem(id int) bool { key, quit := m.snapshotCheckTaskQueue.Get() if quit { return false @@ -214,7 +219,7 @@ func (m *SnapshotMonitor) processNextWorkItem(id int) bool { return true } -func (m *SnapshotMonitor) handleErr(err error, key interface{}) { +func (m *snapshotMonitorImpl) handleErr(err error, key interface{}) { if err == nil { m.snapshotCheckTaskQueue.Forget(key) return @@ -237,12 +242,12 @@ func (m *SnapshotMonitor) handleErr(err error, key interface{}) { m.snapshotCheckTaskQueue.Forget(key) } -func (m *SnapshotMonitor) snapshotCheckWorker(id int) { +func (m *snapshotMonitorImpl) snapshotCheckWorker(id int) { for m.processNextWorkItem(id) { } } -func (m *SnapshotMonitor) Stop() { +func (m *snapshotMonitorImpl) Stop() { m.logger.WithField("monitor", monitorName).Info("Closing snapshot monitor") m.snapshotCheckTaskQueue.ShutDown() @@ -250,11 +255,11 @@ func (m *SnapshotMonitor) Stop() { m.quit() } -func (m *SnapshotMonitor) RunOnce() error { +func (m *snapshotMonitorImpl) RunOnce() error { return fmt.Errorf("RunOnce is not implemented") } -func (m *SnapshotMonitor) UpdateConfiguration(map[string]interface{}) error { +func (m *snapshotMonitorImpl) UpdateConfiguration(map[string]interface{}) error { dataIntegrityCronJob, err := m.ds.GetSettingValueExisted(types.SettingNameSnapshotDataIntegrityCronJob) if err != nil { return errors.Wrapf(err, "failed to get %v setting", types.SettingNameSnapshotDataIntegrityCronJob) @@ -289,13 +294,13 @@ func (m *SnapshotMonitor) UpdateConfiguration(map[string]interface{}) error { return nil } -func (m *SnapshotMonitor) GetCollectedData() (interface{}, error) { +func (m *snapshotMonitorImpl) GetCollectedData() (SnapshotMonitorStatus, error) { m.RLock() defer m.RUnlock() return m.SnapshotMonitorStatus, nil } -func (m *SnapshotMonitor) shouldAddToInProgressSnapshotCheckTasks(snapshotName string) bool { +func (m *snapshotMonitorImpl) shouldAddToInProgressSnapshotCheckTasks(snapshotName string) bool { m.inProgressSnapshotCheckTasksLock.Lock() defer m.inProgressSnapshotCheckTasksLock.Unlock() @@ -309,14 +314,14 @@ func (m *SnapshotMonitor) shouldAddToInProgressSnapshotCheckTasks(snapshotName s return true } -func (m *SnapshotMonitor) deleteFromInProgressSnapshotCheckTasks(snapshotName string) { +func (m *snapshotMonitorImpl) deleteFromInProgressSnapshotCheckTasks(snapshotName string) { m.inProgressSnapshotCheckTasksLock.Lock() defer m.inProgressSnapshotCheckTasksLock.Unlock() delete(m.inProgressSnapshotCheckTasks, snapshotName) } -func (m *SnapshotMonitor) run(arg interface{}) error { +func (m *snapshotMonitorImpl) run(arg interface{}) error { task, ok := arg.(snapshotCheckTask) if !ok { return fmt.Errorf("failed to assert value: %v", arg) @@ -355,7 +360,7 @@ func (m *SnapshotMonitor) run(arg interface{}) error { return m.waitAndHandleSnapshotHashing(engine, engineClientProxy, task.snapshotName) } -func (m *SnapshotMonitor) canRequestSnapshotHash(engine *longhorn.Engine) error { +func (m *snapshotMonitorImpl) canRequestSnapshotHash(engine *longhorn.Engine) error { if err := m.checkVolumeIsNotPurging(engine); err != nil { return err } @@ -375,7 +380,7 @@ func (m *SnapshotMonitor) canRequestSnapshotHash(engine *longhorn.Engine) error return nil } -func (m *SnapshotMonitor) requestSnapshotHashing(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, +func (m *snapshotMonitorImpl) requestSnapshotHashing(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, snapshotName string, changeEvent bool) error { // One snapshot CR might be updated many times in a short period. // The checksum calculation is expected to run once if it is triggered by snapshot update event. @@ -395,7 +400,7 @@ func (m *SnapshotMonitor) requestSnapshotHashing(engine *longhorn.Engine, engine return engineClientProxy.SnapshotHash(engine, snapshotName, rehash) } -func (m *SnapshotMonitor) waitAndHandleSnapshotHashing(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, +func (m *snapshotMonitorImpl) waitAndHandleSnapshotHashing(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, snapshotName string) error { opts := []retry.Option{ retry.Context(m.ctx), @@ -421,7 +426,7 @@ func (m *SnapshotMonitor) waitAndHandleSnapshotHashing(engine *longhorn.Engine, return nil } -func (m *SnapshotMonitor) checkVolumeNotInMigration(volumeName string) error { +func (m *snapshotMonitorImpl) checkVolumeNotInMigration(volumeName string) error { v, err := m.ds.GetVolume(volumeName) if err != nil { return err @@ -432,7 +437,7 @@ func (m *SnapshotMonitor) checkVolumeNotInMigration(volumeName string) error { return nil } -func (m *SnapshotMonitor) checkVolumeIsNotPurging(engine *longhorn.Engine) error { +func (m *snapshotMonitorImpl) checkVolumeIsNotPurging(engine *longhorn.Engine) error { for _, status := range engine.Status.PurgeStatus { if status.IsPurging { return fmt.Errorf("cannot hash snapshot during purging") @@ -441,7 +446,7 @@ func (m *SnapshotMonitor) checkVolumeIsNotPurging(engine *longhorn.Engine) error return nil } -func (m *SnapshotMonitor) checkVolumeIsNotRestoring(engine *longhorn.Engine) error { +func (m *snapshotMonitorImpl) checkVolumeIsNotRestoring(engine *longhorn.Engine) error { for _, status := range engine.Status.RestoreStatus { if status.IsRestoring { return fmt.Errorf("cannot hash snapshot during restoring") @@ -450,7 +455,7 @@ func (m *SnapshotMonitor) checkVolumeIsNotRestoring(engine *longhorn.Engine) err return nil } -func (m *SnapshotMonitor) syncHashStatusFromEngineReplicas(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, +func (m *snapshotMonitorImpl) syncHashStatusFromEngineReplicas(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, snapshotName string) error { hashStatus, err := engineClientProxy.SnapshotHashStatus(engine, snapshotName) if err != nil { @@ -493,7 +498,7 @@ func (m *SnapshotMonitor) syncHashStatusFromEngineReplicas(engine *longhorn.Engi return nil } -func (m *SnapshotMonitor) kickOutCorruptedReplicas(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, +func (m *snapshotMonitorImpl) kickOutCorruptedReplicas(engine *longhorn.Engine, engineClientProxy engineapi.EngineClientProxy, checksum string, hashStatus map[string]*longhorn.HashStatus) { for address, status := range hashStatus { if status.Checksum == checksum { @@ -572,7 +577,7 @@ func determineChecksum(checksums map[string][]string) (bool, string, int) { return found, checksum, maxVotes } -func (m *SnapshotMonitor) getSnapshotDataIntegrity(volumeName string) (longhorn.SnapshotDataIntegrity, error) { +func (m *snapshotMonitorImpl) getSnapshotDataIntegrity(volumeName string) (longhorn.SnapshotDataIntegrity, error) { volume, err := m.ds.GetVolumeRO(volumeName) if err != nil { return "", err diff --git a/controller/node_controller.go b/controller/node_controller.go index 61e2ab91e8..0572dd898f 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -56,10 +56,10 @@ type NodeController struct { kubeClient clientset.Interface eventRecorder record.EventRecorder - diskMonitor monitor.Monitor - environmentCheckMonitor monitor.Monitor + diskMonitor monitor.DiskMonitor + environmentCheckMonitor monitor.EnvironmentCheckMonitor - snapshotMonitor monitor.Monitor + snapshotMonitor monitor.SnapshotMonitor snapshotChangeEventQueue workqueue.TypedInterface[any] snapshotChangeEventQueueLock sync.Mutex @@ -474,10 +474,9 @@ func (nc *NodeController) syncNode(key string) (err error) { } if nc.snapshotMonitor != nil { - data, _ := nc.snapshotMonitor.GetCollectedData() - status, ok := data.(monitor.SnapshotMonitorStatus) - if !ok { - log.Errorf("Failed to assert value from snapshot monitor: %v", data) + status, err := nc.snapshotMonitor.GetCollectedData() + if err != nil { + log.WithError(err).Errorf("Failed to collect status from snapshot monitor") } else { node.Status.SnapshotCheckStatus.LastPeriodicCheckedAt = status.LastSnapshotPeriodicCheckedAt } @@ -1274,34 +1273,36 @@ func BackingImageDiskFileCleanup(node *longhorn.Node, bi *longhorn.BackingImage, } } -func (nc *NodeController) createDiskMonitor() (monitor.Monitor, error) { +func (nc *NodeController) createDiskMonitor() (monitor.DiskMonitor, error) { if nc.diskMonitor != nil { return nc.diskMonitor, nil } - monitor, err := monitor.NewDiskMonitor(nc.logger, nc.ds, nc.controllerID, nc.enqueueNodeForMonitor) + mon, err := monitor.NewDiskMonitor(nc.logger, nc.ds, nc.controllerID, nc.enqueueNodeForMonitor) if err != nil { return nil, err } + mon.Start() - nc.diskMonitor = monitor + nc.diskMonitor = mon - return monitor, nil + return mon, nil } -func (nc *NodeController) createEnvironmentCheckMonitor() (monitor.Monitor, error) { +func (nc *NodeController) createEnvironmentCheckMonitor() (monitor.EnvironmentCheckMonitor, error) { if nc.environmentCheckMonitor != nil { return nc.environmentCheckMonitor, nil } - monitor, err := monitor.NewEnvironmentCheckMonitor(nc.logger, nc.ds, nc.controllerID, nc.enqueueNodeForMonitor) + mon, err := monitor.NewEnvironmentCheckMonitor(nc.logger, nc.ds, nc.controllerID, nc.enqueueNodeForMonitor) if err != nil { return nil, err } + mon.Start() - nc.environmentCheckMonitor = monitor + nc.environmentCheckMonitor = mon - return monitor, nil + return mon, nil } func (nc *NodeController) enqueueNodeForMonitor(key string) { @@ -1440,12 +1441,11 @@ func (nc *NodeController) createOrphan(node *longhorn.Node, diskName, replicaDat } func (nc *NodeController) syncWithDiskMonitor(node *longhorn.Node) (map[string]*monitor.CollectedDiskInfo, error) { - v, err := nc.diskMonitor.GetCollectedData() + collectedDiskInfo, err := nc.diskMonitor.GetCollectedData() if err != nil { return map[string]*monitor.CollectedDiskInfo{}, err } - collectedDiskInfo := v.(map[string]*monitor.CollectedDiskInfo) if matched := isDiskMatched(node, collectedDiskInfo); !matched { return map[string]*monitor.CollectedDiskInfo{}, errors.New("mismatching disks in node resource object and monitor collected data") @@ -1455,14 +1455,9 @@ func (nc *NodeController) syncWithDiskMonitor(node *longhorn.Node) (map[string]* } func (nc *NodeController) syncWithEnvironmentCheckMonitor() ([]longhorn.Condition, error) { - v, err := nc.environmentCheckMonitor.GetCollectedData() + conditions, err := nc.environmentCheckMonitor.GetCollectedData() if err != nil { - return []longhorn.Condition{}, err - } - - conditions, ok := v.([]longhorn.Condition) - if !ok { - return []longhorn.Condition{}, errors.New("failed to convert the collected data to conditions") + return nil, errors.Wrapf(err, "failed to convert the collected data to conditions") } return conditions, nil @@ -1582,7 +1577,7 @@ func isDiskMatched(node *longhorn.Node, collectedDiskInfo map[string]*monitor.Co return true } -func (nc *NodeController) createSnapshotMonitor() (mon monitor.Monitor, err error) { +func (nc *NodeController) createSnapshotMonitor() (mon monitor.SnapshotMonitor, err error) { defer func() { if err == nil { err = nc.snapshotMonitor.UpdateConfiguration(map[string]interface{}{}) @@ -1597,6 +1592,7 @@ func (nc *NodeController) createSnapshotMonitor() (mon monitor.Monitor, err erro if err != nil { return nil, err } + mon.Start() nc.snapshotMonitor = mon