Skip to content

Commit

Permalink
Merge pull request #185 from CrazyHZM/fix/local_topic_clean
Browse files Browse the repository at this point in the history
Fix/local topic clean
  • Loading branch information
CrazyHZM authored Nov 17, 2022
2 parents c9d8c6b + cff3621 commit f148d51
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
1 change: 1 addition & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func (p *program) Start() error {
}

nsqd.LoadMetadata(initDisabled)
nsqd.LoadLocalCacheTopic()
nsqd.NotifyPersistMetadata()

err = nsqdServer.Main()
Expand Down
58 changes: 53 additions & 5 deletions consistence/nsqd_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,36 @@ func doLogQClean(tcData *coordData, localTopic *nsqd.Topic, retentionSize int64,
}

func (ncoord *NsqdCoordinator) GreedyCleanTopicOldData(localTopic *nsqd.Topic) error {
tcData, err := ncoord.getTopicCoordData(localTopic.GetTopicName(), localTopic.GetTopicPart())
topicName := localTopic.GetTopicName()
partition := localTopic.GetTopicPart()
topicMetaInfoForPartition, err := ncoord.GetTopicInfo(topicName, partition)
if err != nil {
return err.ToErrorType()
coordLog.Errorf("Cleaning: failed to get topic info, %v-%v: %v", topicName, partition, err)
return nil
}

retentionDay := tcData.topicInfo.RetentionDay
topicMetaInfo := topicMetaInfoForPartition.TopicMetaInfo

retentionDay := topicMetaInfo.RetentionDay
if retentionDay == 0 {
retentionDay = int32(nsqd.DEFAULT_RETENTION_DAYS)
}
retentionSize := (MaxTopicRetentionSizePerDay / 16) * int64(retentionDay)
doLogQClean(tcData, localTopic, retentionSize, false)
doLogQClean(tcData, localTopic, retentionSize, true)

basePath := GetTopicPartitionBasePath(ncoord.dataRootPath, topicName, partition)
topicLeaderSession, err := ncoord.leadership.GetTopicLeaderSession(topicName, partition)
if err != nil {
coordLog.Errorf("Cleaning: failed to get topic leader info:%v-%v, err:%v", topicName, partition, err)
return nil
}

tc, localTopic, loadErr := ncoord.initLocalTopicCoord(topicMetaInfoForPartition, topicLeaderSession, basePath, true, false)
if loadErr != nil {
coordLog.Errorf("Cleaning: topic %v coord init error: %v", topicMetaInfoForPartition.GetTopicDesp(), loadErr.Error())
return nil
}
doLogQClean(tc.GetData(), localTopic, retentionSize, false)
doLogQClean(tc.GetData(), localTopic, retentionSize, true)
return nil
}

Expand All @@ -452,6 +470,30 @@ func (ncoord *NsqdCoordinator) checkAndCleanOldData() {
ticker := time.NewTicker(time.Minute * 10)
defer ticker.Stop()

for topicName, data := range ncoord.localNsqd.GetLocalCacheTopicMapRef() {
for partition := range data {
topicMetaInfoForPartition, err := ncoord.GetTopicInfo(topicName, partition)
if err != nil {
coordLog.Errorf("Cleaning: failed to get topic info, %v-%v: %v", topicName, partition, err)
continue
}

basePath := GetTopicPartitionBasePath(ncoord.dataRootPath, topicName, partition)
topicLeaderSession, err := ncoord.leadership.GetTopicLeaderSession(topicName, partition)
if err != nil {
coordLog.Errorf("Cleaning: failed to get topic leader info:%v-%v, err:%v", topicName, partition, err)
continue
}

tc, _, loadErr := ncoord.initLocalTopicCoord(topicMetaInfoForPartition, topicLeaderSession, basePath, true, false)
if loadErr != nil {
coordLog.Errorf("Cleaning: topic %v coord init error: %v", topicMetaInfoForPartition.GetTopicDesp(), loadErr.Error())
continue
}
coordLog.Debugf("Load local cache topic from cache file, topic: %v", tc.GetData().topicInfo.GetTopicDesp())
}
}

doCheckAndCleanOld := func(checkRetentionDay bool) {
tmpCoords := make(map[string]map[int]*TopicCoordinator)
ncoord.getAllCoords(tmpCoords)
Expand All @@ -472,6 +514,7 @@ func (ncoord *NsqdCoordinator) checkAndCleanOldData() {
if checkRetentionDay {
retentionSize = 0
}
coordLog.Debugf("start clean local topic: %v", tcData.topicInfo.GetTopicDesp())
doLogQClean(tcData, localTopic, retentionSize, false)
doLogQClean(tcData, localTopic, retentionSize, true)
}
Expand Down Expand Up @@ -2616,6 +2659,11 @@ func (ncoord *NsqdCoordinator) GetTopicMetaInfo(topic string) (TopicMetaInfo, er
return meta, err
}

func (ncoord *NsqdCoordinator) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error) {
meta, err := ncoord.leadership.GetTopicInfo(topic, partition)
return meta, err
}

func (ncoord *NsqdCoordinator) TryCleanUnusedTopicOnLocal(topic string, partition int, dryRun bool) error {
if ncoord.leadership == nil {
return nil
Expand Down
41 changes: 39 additions & 2 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -75,8 +76,9 @@ type NSQD struct {
errValue atomic.Value
startTime time.Time

topicMap map[string]map[int]*Topic
magicCodeMutex sync.Mutex
topicMap map[string]map[int]*Topic
localCacheTopicMap map[string]map[int]*Topic
magicCodeMutex sync.Mutex

poolSize int
topicJobPoolSize int
Expand Down Expand Up @@ -128,6 +130,7 @@ func New(opts *Options) (*NSQD, error) {
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]map[int]*Topic),
localCacheTopicMap: make(map[string]map[int]*Topic),
exitChan: make(chan int),
MetaNotifyChan: make(chan interface{}, 128),
OptsNotificationChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -298,6 +301,10 @@ func (n *NSQD) GetTopicMapRef() map[string]map[int]*Topic {
return n.topicMap
}

func (n *NSQD) GetLocalCacheTopicMapRef() map[string]map[int]*Topic {
return n.localCacheTopicMap
}

func (n *NSQD) GetTopicPartitions(topicName string) map[int]*Topic {
tmpMap := make(map[int]*Topic)
n.RLock()
Expand Down Expand Up @@ -436,6 +443,36 @@ func (n *NSQD) LoadMetadata(disabled int32) {
}
}

func (n *NSQD) LoadLocalCacheTopic() {
fn := fmt.Sprintf(path.Join(n.GetOpts().DataPath))
data, _ := ioutil.ReadDir(fn)
for _, topicFile := range data {
topicFilePath := fmt.Sprintf(path.Join(n.GetOpts().DataPath, topicFile.Name()))

cacheFiles, _ := ioutil.ReadDir(topicFilePath)
for _, file := range cacheFiles {
if find := strings.Contains(file.Name(), ".diskqueue"); find {
str := strings.Split(file.Name(), ".diskqueue")
topicDes := strings.Split(str[0], "-")
topicName := topicDes[0]
partition, _ := strconv.Atoi(topicDes[1])

topics, ok := n.localCacheTopicMap[topicName]
if ok {
_, ok1 := topics[partition]
if ok1 {
continue
}
} else {
topics = make(map[int]*Topic)
n.localCacheTopicMap[topicName] = topics
}
topics[partition] = nil
}
}
}
}

func (n *NSQD) persistLoop() {
for {
select {
Expand Down

0 comments on commit f148d51

Please sign in to comment.