Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(env-check): remove monitor initial pending to fix disk mismatch problem #3426

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions controller/monitor/disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ type DiskServiceClient struct {
err error
}

type DiskMonitor struct {
type DiskMonitor Monitor[map[string]*CollectedDiskInfo]

var _ DiskMonitor = &diskMonitorImpl{}

type diskMonitorImpl struct {
*baseMonitor

nodeName string
checkVolumeMeta bool

startOnce sync.Once
collectedDataLock sync.RWMutex
collectedData map[string]*CollectedDiskInfo

Expand Down Expand Up @@ -69,10 +74,10 @@ type GetDiskConfigHandler func(longhorn.DiskType, string, string, longhorn.DiskD
type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, string, *DiskServiceClient) (*util.DiskConfig, error)
type GetReplicaDataStoresHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, string, *DiskServiceClient) (map[string]string, error)

func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) {
func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (DiskMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &DiskMonitor{
m := &diskMonitorImpl{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,
Expand All @@ -89,39 +94,44 @@ func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName
getReplicaDataStoresHandler: getReplicaDataStores,
}

go m.Start()

return m, nil
}

func (m *DiskMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring disks")
func (m *diskMonitorImpl) Start() {
m.startOnce.Do(func() {
if err := m.run(); err != nil {
m.logger.WithError(err).Error("Initial disk monitoring failed")
}
return false, nil
}); err != nil {
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Disk monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start disk monitor")
}
}
go func() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) {
if err := m.run(); err != nil {
m.logger.WithError(err).Error("Stopped monitoring disks")
}
return false, nil
}); err != nil {
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Disk monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start disk monitor")
}
}
}()
})
}

func (m *DiskMonitor) Stop() {
func (m *diskMonitorImpl) Stop() {
m.quit()
}

func (m *DiskMonitor) RunOnce() error {
return m.run(struct{}{})
func (m *diskMonitorImpl) RunOnce() error {
return m.run()
}

func (m *DiskMonitor) UpdateConfiguration(map[string]interface{}) error {
func (m *diskMonitorImpl) UpdateConfiguration(map[string]interface{}) error {
return nil
}

func (m *DiskMonitor) GetCollectedData() (interface{}, error) {
func (m *diskMonitorImpl) GetCollectedData() (map[string]*CollectedDiskInfo, error) {
m.collectedDataLock.RLock()
defer m.collectedDataLock.RUnlock()

Expand All @@ -133,7 +143,7 @@ func (m *DiskMonitor) GetCollectedData() (interface{}, error) {
return data, nil
}

func (m *DiskMonitor) run(value interface{}) error {
func (m *diskMonitorImpl) run() error {
node, err := m.ds.GetNode(m.nodeName)
if err != nil {
return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName)
Expand All @@ -154,7 +164,7 @@ func (m *DiskMonitor) run(value interface{}) error {
return nil
}

func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) {
func (m *diskMonitorImpl) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) {
switch dataEngine {
case longhorn.DataEngineTypeV1:
return m.ds.GetDefaultInstanceManagerByNodeRO(m.nodeName, dataEngine)
Expand All @@ -180,7 +190,7 @@ func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine
return nil, fmt.Errorf("unknown data engine %v", dataEngine)
}

func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient {
func (m *diskMonitorImpl) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient {
clients := map[longhorn.DataEngineType]*DiskServiceClient{}

dataEngines := m.ds.GetDataEngines()
Expand All @@ -207,7 +217,7 @@ func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskS
return clients
}

func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
func (m *diskMonitorImpl) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
for _, client := range clients {
if client.c != nil {
client.c.Close()
Expand All @@ -217,7 +227,7 @@ func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp
}

// Collect disk data and generate disk UUID blindly.
func (m *DiskMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
func (m *diskMonitorImpl) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
diskInfoMap := make(map[string]*CollectedDiskInfo, 0)

diskServiceClients := m.newDiskServiceClients()
Expand Down Expand Up @@ -392,7 +402,7 @@ func NewDiskInfo(diskName, diskUUID, diskPath string, diskDriver longhorn.DiskDr
return diskInfo
}

func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
func (m *diskMonitorImpl) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return m.getOrphanedReplicaDirectoryNames(diskUUID, diskPath, replicaDataStores)
Expand All @@ -403,7 +413,7 @@ func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, d
}
}

func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) {
func (m *diskMonitorImpl) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) {
if len(replicaDataStores) == 0 {
return map[string]string{}, nil
}
Expand All @@ -418,7 +428,7 @@ func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]s
return replicaDataStores, nil
}

func (m *DiskMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
func (m *diskMonitorImpl) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
if len(replicaDataStores) == 0 {
return map[string]string{}, nil
}
Expand Down
80 changes: 45 additions & 35 deletions controller/monitor/environment_check_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ var (
nfsProtocolVersions = map[string]bool{"4.0": true, "4.1": true, "4.2": true}
)

type EnvironmentCheckMonitor struct {
type EnvironmentCheckMonitor Monitor[[]longhorn.Condition]

var _ EnvironmentCheckMonitor = &environmentCheckMonitorImpl{}

type environmentCheckMonitorImpl struct {
*baseMonitor

nodeName string

startOnce sync.Once
collectedDataLock sync.RWMutex
collectedData *CollectedEnvironmentCheckInfo

Expand All @@ -60,10 +65,10 @@ type CollectedEnvironmentCheckInfo struct {
conditions []longhorn.Condition
}

func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*EnvironmentCheckMonitor, error) {
func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (EnvironmentCheckMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &EnvironmentCheckMonitor{
m := &environmentCheckMonitorImpl{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, EnvironmentCheckMonitorSyncPeriod),

nodeName: nodeName,
Expand All @@ -74,39 +79,44 @@ func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataSto
syncCallback: syncCallback,
}

go m.Start()

return m, nil
}

func (m *EnvironmentCheckMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring environment check")
func (m *environmentCheckMonitorImpl) Start() {
m.startOnce.Do(func() {
if err := m.run(); err != nil {
m.logger.WithError(err).Error("Initial environment monitoring failed")
}
return false, nil
}); err != nil {
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Environment check monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start environment check monitor")
}
}
go func() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) {
COLDTURNIP marked this conversation as resolved.
Show resolved Hide resolved
if err := m.run(); err != nil {
m.logger.WithError(err).Error("Stopped monitoring environment check")
}
return false, nil
}); err != nil {
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Environment check monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start environment check monitor")
}
}
}()
})
}

func (m *EnvironmentCheckMonitor) Stop() {
func (m *environmentCheckMonitorImpl) Stop() {
m.quit()
}

func (m *EnvironmentCheckMonitor) RunOnce() error {
return m.run(struct{}{})
func (m *environmentCheckMonitorImpl) RunOnce() error {
return m.run()
}

func (m *EnvironmentCheckMonitor) UpdateConfiguration(map[string]interface{}) error {
func (m *environmentCheckMonitorImpl) UpdateConfiguration(map[string]interface{}) error {
return nil
}

func (m *EnvironmentCheckMonitor) GetCollectedData() (interface{}, error) {
func (m *environmentCheckMonitorImpl) GetCollectedData() ([]longhorn.Condition, error) {
m.collectedDataLock.RLock()
defer m.collectedDataLock.RUnlock()

Expand All @@ -118,7 +128,7 @@ func (m *EnvironmentCheckMonitor) GetCollectedData() (interface{}, error) {
return data, nil
}

func (m *EnvironmentCheckMonitor) run(value interface{}) error {
func (m *environmentCheckMonitorImpl) run() error {
node, err := m.ds.GetNode(m.nodeName)
if err != nil {
return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName)
Expand All @@ -139,7 +149,7 @@ func (m *EnvironmentCheckMonitor) run(value interface{}) error {
return nil
}

func (m *EnvironmentCheckMonitor) collectEnvironmentCheckData(node *longhorn.Node) *CollectedEnvironmentCheckInfo {
func (m *environmentCheckMonitorImpl) collectEnvironmentCheckData(node *longhorn.Node) *CollectedEnvironmentCheckInfo {
kubeNode, err := m.ds.GetKubernetesNodeRO(node.Name)
if err != nil {
return &CollectedEnvironmentCheckInfo{
Expand All @@ -150,7 +160,7 @@ func (m *EnvironmentCheckMonitor) collectEnvironmentCheckData(node *longhorn.Nod
return m.environmentCheck(kubeNode)
}

func (m *EnvironmentCheckMonitor) environmentCheck(kubeNode *corev1.Node) *CollectedEnvironmentCheckInfo {
func (m *environmentCheckMonitorImpl) environmentCheck(kubeNode *corev1.Node) *CollectedEnvironmentCheckInfo {
collectedData := &CollectedEnvironmentCheckInfo{
conditions: []longhorn.Condition{},
}
Expand All @@ -176,7 +186,7 @@ func (m *EnvironmentCheckMonitor) environmentCheck(kubeNode *corev1.Node) *Colle
return collectedData
}

func (m *EnvironmentCheckMonitor) syncPackagesInstalled(kubeNode *corev1.Node, namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) {
func (m *environmentCheckMonitorImpl) syncPackagesInstalled(kubeNode *corev1.Node, namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) {
osImage := strings.ToLower(kubeNode.Status.NodeInfo.OSImage)

packageProbeExecutables := make(map[string]string)
Expand Down Expand Up @@ -244,7 +254,7 @@ func (m *EnvironmentCheckMonitor) syncPackagesInstalled(kubeNode *corev1.Node, n
fmt.Sprintf("All required packages %v are installed", installedPackages))
}

func (m *EnvironmentCheckMonitor) syncPackagesInstalledTalosLinux(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) {
func (m *environmentCheckMonitorImpl) syncPackagesInstalledTalosLinux(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) {
type validateCommand struct {
binary string
args []string
Expand Down Expand Up @@ -320,7 +330,7 @@ func (m *EnvironmentCheckMonitor) syncPackagesInstalledTalosLinux(namespaces []l
}
}

func (m *EnvironmentCheckMonitor) syncMultipathd(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) {
func (m *environmentCheckMonitorImpl) syncMultipathd(namespaces []lhtypes.Namespace, collectedData *CollectedEnvironmentCheckInfo) {
nsexec, err := lhns.NewNamespaceExecutor(lhtypes.ProcessNone, lhtypes.HostProcDirectory, namespaces)
if err != nil {
collectedData.conditions = types.SetCondition(collectedData.conditions, longhorn.NodeConditionTypeMultipathd, longhorn.ConditionStatusFalse,
Expand All @@ -339,7 +349,7 @@ func (m *EnvironmentCheckMonitor) syncMultipathd(namespaces []lhtypes.Namespace,
collectedData.conditions = types.SetCondition(collectedData.conditions, longhorn.NodeConditionTypeMultipathd, longhorn.ConditionStatusTrue, "", "")
}

func (m *EnvironmentCheckMonitor) checkPackageInstalled(packageProbeExecutables map[string]string, namespaces []lhtypes.Namespace) (installed, notInstalled []string, err error) {
func (m *environmentCheckMonitorImpl) checkPackageInstalled(packageProbeExecutables map[string]string, namespaces []lhtypes.Namespace) (installed, notInstalled []string, err error) {
nsexec, err := lhns.NewNamespaceExecutor(lhtypes.ProcessNone, lhtypes.HostProcDirectory, namespaces)
if err != nil {
return nil, nil, err
Expand All @@ -357,7 +367,7 @@ func (m *EnvironmentCheckMonitor) checkPackageInstalled(packageProbeExecutables
return installed, notInstalled, nil
}

func (m *EnvironmentCheckMonitor) checkHugePages(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) {
func (m *environmentCheckMonitorImpl) checkHugePages(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) {
hugePageLimitInMiB, err := m.ds.GetSettingAsInt(types.SettingNameV2DataEngineHugepageLimit)
if err != nil {
m.logger.Debugf("Failed to fetch v2-data-engine-hugepage-limit setting, using default value: %d", 2048)
Expand Down Expand Up @@ -391,7 +401,7 @@ func (m *EnvironmentCheckMonitor) checkHugePages(kubeNode *corev1.Node, collecte
)
}

func (m *EnvironmentCheckMonitor) checkKernelModulesLoaded(kubeNode *corev1.Node, isV2DataEngine bool, collectedData *CollectedEnvironmentCheckInfo) {
func (m *environmentCheckMonitorImpl) checkKernelModulesLoaded(kubeNode *corev1.Node, isV2DataEngine bool, collectedData *CollectedEnvironmentCheckInfo) {
modulesToCheck := make(map[string]string)
for k, v := range kernelModules {
modulesToCheck[k] = v
Expand Down Expand Up @@ -452,7 +462,7 @@ func checkModulesLoadedUsingkmod(modules map[string]string) (map[string]string,
return notFoundModules, nil
}

func (m *EnvironmentCheckMonitor) checkModulesLoadedByConfigFile(modules map[string]string, kernelVersion string) ([]string, error) {
func (m *environmentCheckMonitorImpl) checkModulesLoadedByConfigFile(modules map[string]string, kernelVersion string) ([]string, error) {
kernelConfigMap, err := lhsys.GetBootKernelConfigMap(kernelConfigDir, kernelVersion)
if err != nil {
if kernelConfigMap, err = lhsys.GetProcKernelConfigMap(lhtypes.HostProcDirectory); err != nil {
Expand All @@ -474,7 +484,7 @@ func (m *EnvironmentCheckMonitor) checkModulesLoadedByConfigFile(modules map[str
return notLoadedModules, nil
}

func (m *EnvironmentCheckMonitor) checkNFSMountConfigFile(supported map[string]bool, configFilePathPrefix string) (actualDefaultVer string, isAllowed bool, err error) {
func (m *environmentCheckMonitorImpl) checkNFSMountConfigFile(supported map[string]bool, configFilePathPrefix string) (actualDefaultVer string, isAllowed bool, err error) {
var nfsVer string
nfsMajor, nfsMinor, err := lhnfs.GetSystemDefaultNFSVersion(configFilePathPrefix)
if err == nil {
Expand All @@ -490,7 +500,7 @@ func (m *EnvironmentCheckMonitor) checkNFSMountConfigFile(supported map[string]b
return actualDefaultVer, supported[nfsVer], nil
}

func (m *EnvironmentCheckMonitor) checkKernelModuleEnabled(module, kmodName string, kernelConfigMap map[string]string) (bool, error) {
func (m *environmentCheckMonitorImpl) checkKernelModuleEnabled(module, kmodName string, kernelConfigMap map[string]string) (bool, error) {
enabled, exists := kernelConfigMap[module]
if !exists {
return false, nil
Expand Down Expand Up @@ -526,7 +536,7 @@ func getModulesConfigsList(modulesMap map[string]string, needModules bool) []str
return modulesConfigs
}

func (m *EnvironmentCheckMonitor) syncNFSClientVersion(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) {
func (m *environmentCheckMonitorImpl) syncNFSClientVersion(kubeNode *corev1.Node, collectedData *CollectedEnvironmentCheckInfo) {
notLoadedModules, err := m.checkModulesLoadedByConfigFile(nfsClientVersions, kubeNode.Status.NodeInfo.KernelVersion)
if err != nil {
collectedData.conditions = types.SetCondition(collectedData.conditions, longhorn.NodeConditionTypeNFSClientInstalled, longhorn.ConditionStatusFalse,
Expand Down
4 changes: 2 additions & 2 deletions controller/monitor/fake_disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ const (
TestOrphanedReplicaDirectoryName = "test-volume-r-000000000"
)

func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) {
func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (DiskMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &DiskMonitor{
m := &diskMonitorImpl{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,
Expand Down
Loading
Loading