diff --git a/log.go b/log.go index 7c06cd7..9525d6e 100644 --- a/log.go +++ b/log.go @@ -19,7 +19,7 @@ import ( type Log struct { ApplyFunc func(Command) error file *os.File - path string + path string entries []*LogEntry commitIndex uint64 mutex sync.Mutex @@ -412,7 +412,7 @@ func (l *Log) appendEntry(entry *LogEntry) error { //-------------------------------------- // compaction the log before index -func (l *Log) Compaction(index uint64, term uint64) error { +func (l *Log) Compact(index uint64, term uint64) error { var entries []*LogEntry l.mutex.Lock() @@ -444,9 +444,16 @@ func (l *Log) Compaction(index uint64, term uint64) error { l.file.Close() // remove the current log file to .bak - os.Remove(l.path) + err = os.Remove(l.path) + if err != nil { + return err + } + // rename the new log file - os.Rename(l.path + ".new", l.path) + err = os.Rename(l.path + ".new", l.path) + if err != nil { + return err + } l.file = file // compaction the in memory log diff --git a/peer.go b/peer.go index 1901965..ab12744 100644 --- a/peer.go +++ b/peer.go @@ -213,27 +213,30 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) { p.mutex.Lock() server, prevLogIndex := p.server, p.prevLogIndex p.mutex.Unlock() - + + var req *AppendEntriesRequest + snapShotNeeded := false + + // we need to hold the log lock to create AppendEntriesRequest + // avoid snapshot to delete the desired entries before AEQ() server.log.mutex.Lock() - if prevLogIndex < server.log.StartIndex() { + if prevLogIndex >= server.log.StartIndex() { + req = server.createAppendEntriesRequest(prevLogIndex) + } else { + snapShotNeeded = true + } + server.log.mutex.Unlock() - // request the log before the latest snapshot - // send out the snapshot - server.log.mutex.Unlock() + p.mutex.Lock() + if snapShotNeeded { req := server.createSnapshotRequest() - - p.mutex.Lock() p.sendSnapshotRequest(req) - p.mutex.Unlock() } else { - - // Lock the server to create a request. - req := server.createAppendEntriesRequest(prevLogIndex) - server.log.mutex.Unlock() - p.mutex.Lock() p.sendFlushRequest(req) - p.mutex.Unlock() } + p.mutex.Unlock() + + } else { break } diff --git a/server.go b/server.go index 4fac4ff..40b2afc 100644 --- a/server.go +++ b/server.go @@ -8,6 +8,7 @@ import ( "os" "sort" "bufio" + "path" ) //------------------------------------------------------------------------------ @@ -791,7 +792,7 @@ func (s *Server) takeSnapshot() error { s.saveSnapshot() - s.log.Compaction(lastIndex, lastTerm) + s.log.Compact(lastIndex, lastTerm) return nil } @@ -822,7 +823,7 @@ func (s *Server) saveSnapshot() error { // Retrieves the log path for the server. func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string { - return fmt.Sprintf("%s/snapshot/%v_%v.ss", s.path, lastTerm, lastIndex) + return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex)) } @@ -840,7 +841,7 @@ func (s *Server) SnapshotRecovery(index uint64, term uint64, machineState int) ( snapshotPath := s.SnapshotPath(index, term) s.currentSnapshot = &Snapshot{index, term, machineState, snapshotPath} s.saveSnapshot() - s.log.Compaction(index, term) + s.log.Compact(index, term) return NewSnapshotResponse(term, true, index), nil @@ -849,7 +850,7 @@ func (s *Server) SnapshotRecovery(index uint64, term uint64, machineState int) ( // Load a snapshot at restart func (s *Server) LoadSnapshot() error { - dir, err := os.OpenFile(s.path + "/snapshot", os.O_RDONLY, 0) + dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0) if err != nil { dir.Close() panic(err) @@ -869,7 +870,7 @@ func (s *Server) LoadSnapshot() error { // not sure how many snapshot we should keep sort.Strings(filenames) - snapshotPath := s.path + "/snapshot/" + filenames[len(filenames) - 1] + snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames) - 1]) // should not file file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0) diff --git a/snapshot.go b/snapshot.go index f2f7746..a948ac1 100644 --- a/snapshot.go +++ b/snapshot.go @@ -24,46 +24,6 @@ type Snapshot struct { path string } -// The request sent to a server to start from the snapshot. -type SnapshotRequest struct { - LeaderName string `json:"leaderName"` - LastIndex uint64 `json:"lastTerm"` - LastTerm uint64 `json:"lastIndex"` - MachineState int `json:"machineState"` -} - -// The response returned from a server appending entries to the log. -type SnapshotResponse struct { - Term uint64 `json:"term"` - Success bool `json:"success"` - CommitIndex uint64 `json:"commitIndex"` -} - -//------------------------------------------------------------------------------ -// -// Constructors -// -//------------------------------------------------------------------------------ - -// Creates a new Snapshot request. -func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest { - return &SnapshotRequest{ - LeaderName: leaderName, - LastIndex: snapshot.lastIndex, - LastTerm: snapshot.lastTerm, - MachineState: snapshot.machineState, - } -} - -// Creates a new Snapshot response. -func NewSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse { - return &SnapshotResponse{ - Term: term, - Success: success, - CommitIndex: commitIndex, - } -} - // Save the snapshot to a file func (ss *Snapshot) Save() error { // Write machine state to temporary buffer.