diff --git a/log.go b/log.go index 1288bac..8dea14b 100644 --- a/log.go +++ b/log.go @@ -180,7 +180,6 @@ func (l *Log) Open(path string) error { } file.Close() - fmt.Println("do recovery") } // Open the file for appending. @@ -229,7 +228,6 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool { // Retrieves a list of entries after a given index. This function also returns // the term of the index provided. func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { - fmt.Println("[GET Entries After] ", index) // Return an error if the index doesn't exist. if index > (uint64(len(l.entries)) + l.startIndex) { panic(fmt.Sprintf("raft: Index is beyond end of log: %v", index)) @@ -237,7 +235,6 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { // If we're going from the beginning of the log then return the whole log. if index == l.startIndex { - fmt.Println("[GET Entries 0] ") return l.entries, l.startTerm } // Determine the term at the given entry and return a subslice. @@ -334,8 +331,6 @@ func (l *Log) SetCommitIndex(index uint64) error { func (l *Log) Truncate(index uint64, term uint64) error { l.mutex.Lock() defer l.mutex.Unlock() - fmt.Println("[Truncate] got log lock") - fmt.Println("[Truncate] index ", index, " term ", term) // Do not allow committed entries to be truncated. if index < l.CommitIndex() { return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term) diff --git a/peer.go b/peer.go index d331d2b..2cc1d31 100644 --- a/peer.go +++ b/peer.go @@ -186,7 +186,6 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) { func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) { // Ignore any null requests. if req == nil { - fmt.Println("send nil...") return 0, false, errors.New("raft.Peer: Request required") } @@ -213,7 +212,6 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) // problem. if p.prevLogIndex > 0 { p.prevLogIndex-- - fmt.Println("decrease the previous index of peer ", p.Name(), " to ", p.prevLogIndex) } if resp.CommitIndex > p.prevLogIndex { p.prevLogIndex = resp.CommitIndex @@ -249,30 +247,31 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) { // Flush the peer when we get a heartbeat timeout. If the channel is // closed then the peer is getting cleaned up and we should exit. - - // defer func() { - // if r := recover(); r != nil { - // fmt.Println("Recovered in f", r) - // go heartbeatTimeoutFunc(startChannel) - // } - // }() - if _, ok := <-c; ok { collecting := p.collecting + if collecting == false { - fmt.Println("begin flush to peer ", p.Name()) p.flush(false) - fmt.Println("finish flush to peer ", p.Name()) + } else { var f FlushResponse // already holding lock f.peer = p - fmt.Println("Do begin flush to peer ", p.Name()) f.term, f.success, f.err = p.flush(true) - fmt.Println("Do finish flush to peer ", p.Name()) if f.success { p.server.response <- f p.collecting = false + } else { + // when we doing collecting, we will not receive + // appendentries request since we lock the server + // we need to check here + + // if we receive a response with higher term + // then step down + if f.term > p.server.currentTerm { + p.server.response <- f + } + p.collecting = false } } } else { diff --git a/server.go b/server.go index 4a38fe8..12639c8 100644 --- a/server.go +++ b/server.go @@ -59,12 +59,14 @@ type Server struct { leader string peers map[string]*Peer mutex sync.Mutex + electionTimer *Timer heartbeatTimeout time.Duration + response chan FlushResponse + currentSnapshot *Snapshot lastSnapshot *Snapshot - stateMachine StateMachine - response chan FlushResponse + stateMachine StateMachine } //------------------------------------------------------------------------------ @@ -295,13 +297,8 @@ func (s *Server) Initialize() error { return fmt.Errorf("raft.Server: %v", err) } - - //fmt.Println("curr ", s.currentTerm) // Update the term to the last term in the log. s.currentTerm = s.log.CurrentTerm() - //fmt.Println("curr ", s.currentTerm) - - for _, peer := range s.peers { peer.pause() @@ -323,7 +320,6 @@ func (s *Server) Stop() { s.mutex.Lock() defer s.mutex.Unlock() s.unload() - //fmt.Println("stop") } // Unloads the server. @@ -403,14 +399,14 @@ func (s *Server) do(command Command) ([]byte, error) { if s.state != Leader { return nil, NotLeaderError } - fmt.Println("do") - // Capture the term that this command is executing within. - //currentTerm := s.currentTerm + + // Pause the heart beat before we begin to collect + // the response for _, peer := range s.peers { peer.pause() } - //fmt.Println("curr term", s.currentTerm) + // Add a new entry to the log. entry := s.log.CreateEntry(s.currentTerm, command) if err := s.log.AppendEntry(entry); err != nil { @@ -418,12 +414,13 @@ func (s *Server) do(command Command) ([]byte, error) { } // begin to collecting data - s.response = make(chan FlushResponse, len(s.peers)) + s.response = make(chan FlushResponse, 2 * len(s.peers)) for _, peer := range s.peers { peer.collecting = true } + // resume the peer heartbeat and fire at it for _, peer := range s.peers { peer.resume() peer.heartbeatTimer.fire() @@ -440,6 +437,10 @@ func (s *Server) do(command Command) ([]byte, error) { response := <-s.response if response.success { responseCount++ + } else { + // step down + s.setCurrentTerm(response.term) + return nil, fmt.Errorf("raft: Unable to flush the entry: %d", entry.Index) } } @@ -461,51 +462,41 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons s.mutex.Lock() defer s.mutex.Unlock() - fmt.Println("[AppendEntries] got lock") - // If the server is stopped then reject it. if !s.Running() { return NewAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped") } - fmt.Println("[AppendEntries] server is running") // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term") } - //fmt.Println("my term ", s.currentTerm, " req ", req.Term) - fmt.Println("[AppendEntries] term is good") s.setCurrentTerm(req.Term) - - fmt.Println("[AppendEntries] set current term is good") // Update the current leader. s.leader = req.LeaderName - //fmt.Println("leader is ", req.LeaderName) // Reset election timeout. if s.electionTimer != nil { s.electionTimer.Reset() } - fmt.Println("[AppendEntries] reset the timer is good") - // Reject if log doesn't contain a matching previous entry. if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err } - fmt.Println("[AppendEntries] truncate is good") + // Append entries to the log. if err := s.log.AppendEntries(req.Entries); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err } - fmt.Println("[AppendEntries] append is good") + // Commit up to the commit index. if err := s.log.SetCommitIndex(req.CommitIndex); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err } - fmt.Println("[AppendEntries] commit is good") + return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil } @@ -742,7 +733,6 @@ func (s *Server) electionTimeoutFunc(startChannel chan bool) { // If an election times out then promote this server. If the channel // closes then that means the server has stopped so kill the function. if _, ok := <-c; ok { - fmt.Println("timeout") s.promote() } else { break @@ -878,7 +868,7 @@ func (s *Server) saveSnapshot() error { if s.currentSnapshot == nil { return errors.New("no snapshot to save") } - fmt.Println("saveSnapshot") + err := s.currentSnapshot.Save() if err != nil { @@ -934,7 +924,7 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro func (s *Server) LoadSnapshot() error { dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0) if err != nil { - fmt.Println("snapshot dir not exist") + return err } @@ -960,7 +950,7 @@ func (s *Server) LoadSnapshot() error { if err != nil { panic(err) } - fmt.Println("snapshot opened") + // TODO check checksum first var snapshotBytes []byte diff --git a/timer.go b/timer.go index de1a94f..0b783ea 100644 --- a/timer.go +++ b/timer.go @@ -4,7 +4,6 @@ import ( "math/rand" "sync" "time" - "fmt" ) //------------------------------------------------------------------------------ @@ -148,8 +147,6 @@ func (t *Timer) Reset() { t.mutex.Lock() defer t.mutex.Unlock() - fmt.Println("[TimerReset] got the lock") - // Stop the timer if it's already running. if t.internalTimer != nil { t.stopInternalTimer() @@ -170,15 +167,12 @@ func (t *Timer) Reset() { case v, ok := <-internalTimer.C: if ok { t.mutex.Lock() - fmt.Println("[TimerReset Go Func] got the lock") if t.c != nil { t.c <- v } t.mutex.Unlock() - fmt.Println("[TimerReset Go Func] release the lock") } case <-resetChannel: } }() - fmt.Println("[TimerReset] release the lock") }