Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
when the leader fails in during the collecting response phase, let it…
Browse files Browse the repository at this point in the history
… step down
  • Loading branch information
xiang90 committed Jun 13, 2013
1 parent f294f3c commit 5bef656
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 55 deletions.
5 changes: 0 additions & 5 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ func (l *Log) Open(path string) error {
}

file.Close()
fmt.Println("do recovery")
}

// Open the file for appending.
Expand Down Expand Up @@ -229,15 +228,13 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool {
// Retrieves a list of entries after a given index. This function also returns
// the term of the index provided.
func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
fmt.Println("[GET Entries After] ", index)
// Return an error if the index doesn't exist.
if index > (uint64(len(l.entries)) + l.startIndex) {
panic(fmt.Sprintf("raft: Index is beyond end of log: %v", index))
}

// If we're going from the beginning of the log then return the whole log.
if index == l.startIndex {
fmt.Println("[GET Entries 0] ")
return l.entries, l.startTerm
}
// Determine the term at the given entry and return a subslice.
Expand Down Expand Up @@ -334,8 +331,6 @@ func (l *Log) SetCommitIndex(index uint64) error {
func (l *Log) Truncate(index uint64, term uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()
fmt.Println("[Truncate] got log lock")
fmt.Println("[Truncate] index ", index, " term ", term)
// Do not allow committed entries to be truncated.
if index < l.CommitIndex() {
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term)
Expand Down
27 changes: 13 additions & 14 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) {
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
fmt.Println("send nil...")
return 0, false, errors.New("raft.Peer: Request required")
}

Expand All @@ -213,7 +212,6 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
// problem.
if p.prevLogIndex > 0 {
p.prevLogIndex--
fmt.Println("decrease the previous index of peer ", p.Name(), " to ", p.prevLogIndex)
}
if resp.CommitIndex > p.prevLogIndex {
p.prevLogIndex = resp.CommitIndex
Expand Down Expand Up @@ -249,30 +247,31 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
// Flush the peer when we get a heartbeat timeout. If the channel is
// closed then the peer is getting cleaned up and we should exit.


// defer func() {
// if r := recover(); r != nil {
// fmt.Println("Recovered in f", r)
// go heartbeatTimeoutFunc(startChannel)
// }
// }()

if _, ok := <-c; ok {
collecting := p.collecting

if collecting == false {
fmt.Println("begin flush to peer ", p.Name())
p.flush(false)
fmt.Println("finish flush to peer ", p.Name())

} else {
var f FlushResponse
// already holding lock
f.peer = p
fmt.Println("Do begin flush to peer ", p.Name())
f.term, f.success, f.err = p.flush(true)
fmt.Println("Do finish flush to peer ", p.Name())
if f.success {
p.server.response <- f
p.collecting = false
} else {
// when we doing collecting, we will not receive
// appendentries request since we lock the server
// we need to check here

// if we receive a response with higher term
// then step down
if f.term > p.server.currentTerm {
p.server.response <- f
}
p.collecting = false
}
}
} else {
Expand Down
50 changes: 20 additions & 30 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ type Server struct {
leader string
peers map[string]*Peer
mutex sync.Mutex

electionTimer *Timer
heartbeatTimeout time.Duration
response chan FlushResponse

currentSnapshot *Snapshot
lastSnapshot *Snapshot
stateMachine StateMachine
response chan FlushResponse
stateMachine StateMachine
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -295,13 +297,8 @@ func (s *Server) Initialize() error {
return fmt.Errorf("raft.Server: %v", err)
}


//fmt.Println("curr ", s.currentTerm)
// Update the term to the last term in the log.
s.currentTerm = s.log.CurrentTerm()
//fmt.Println("curr ", s.currentTerm)



for _, peer := range s.peers {
peer.pause()
Expand All @@ -323,7 +320,6 @@ func (s *Server) Stop() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.unload()
//fmt.Println("stop")
}

// Unloads the server.
Expand Down Expand Up @@ -403,27 +399,28 @@ func (s *Server) do(command Command) ([]byte, error) {
if s.state != Leader {
return nil, NotLeaderError
}
fmt.Println("do")
// Capture the term that this command is executing within.
//currentTerm := s.currentTerm

// Pause the heart beat before we begin to collect
// the response

for _, peer := range s.peers {
peer.pause()
}
//fmt.Println("curr term", s.currentTerm)

// Add a new entry to the log.
entry := s.log.CreateEntry(s.currentTerm, command)
if err := s.log.AppendEntry(entry); err != nil {
return nil, err
}

// begin to collecting data
s.response = make(chan FlushResponse, len(s.peers))
s.response = make(chan FlushResponse, 2 * len(s.peers))

for _, peer := range s.peers {
peer.collecting = true
}

// resume the peer heartbeat and fire at it
for _, peer := range s.peers {
peer.resume()
peer.heartbeatTimer.fire()
Expand All @@ -440,6 +437,10 @@ func (s *Server) do(command Command) ([]byte, error) {
response := <-s.response
if response.success {
responseCount++
} else {
// step down
s.setCurrentTerm(response.term)
return nil, fmt.Errorf("raft: Unable to flush the entry: %d", entry.Index)
}

}
Expand All @@ -461,51 +462,41 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
s.mutex.Lock()
defer s.mutex.Unlock()

fmt.Println("[AppendEntries] got lock")

// If the server is stopped then reject it.
if !s.Running() {
return NewAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped")
}

fmt.Println("[AppendEntries] server is running")
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term")
}
//fmt.Println("my term ", s.currentTerm, " req ", req.Term)
fmt.Println("[AppendEntries] term is good")

s.setCurrentTerm(req.Term)


fmt.Println("[AppendEntries] set current term is good")
// Update the current leader.
s.leader = req.LeaderName

//fmt.Println("leader is ", req.LeaderName)
// Reset election timeout.
if s.electionTimer != nil {
s.electionTimer.Reset()
}

fmt.Println("[AppendEntries] reset the timer is good")

// Reject if log doesn't contain a matching previous entry.
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
}
fmt.Println("[AppendEntries] truncate is good")

// Append entries to the log.
if err := s.log.AppendEntries(req.Entries); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
}
fmt.Println("[AppendEntries] append is good")

// Commit up to the commit index.
if err := s.log.SetCommitIndex(req.CommitIndex); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
}
fmt.Println("[AppendEntries] commit is good")

return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil
}

Expand Down Expand Up @@ -742,7 +733,6 @@ func (s *Server) electionTimeoutFunc(startChannel chan bool) {
// If an election times out then promote this server. If the channel
// closes then that means the server has stopped so kill the function.
if _, ok := <-c; ok {
fmt.Println("timeout")
s.promote()
} else {
break
Expand Down Expand Up @@ -878,7 +868,7 @@ func (s *Server) saveSnapshot() error {
if s.currentSnapshot == nil {
return errors.New("no snapshot to save")
}
fmt.Println("saveSnapshot")

err := s.currentSnapshot.Save()

if err != nil {
Expand Down Expand Up @@ -934,7 +924,7 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro
func (s *Server) LoadSnapshot() error {
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
if err != nil {
fmt.Println("snapshot dir not exist")

return err
}

Expand All @@ -960,7 +950,7 @@ func (s *Server) LoadSnapshot() error {
if err != nil {
panic(err)
}
fmt.Println("snapshot opened")

// TODO check checksum first

var snapshotBytes []byte
Expand Down
6 changes: 0 additions & 6 deletions timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math/rand"
"sync"
"time"
"fmt"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -148,8 +147,6 @@ func (t *Timer) Reset() {
t.mutex.Lock()
defer t.mutex.Unlock()

fmt.Println("[TimerReset] got the lock")

// Stop the timer if it's already running.
if t.internalTimer != nil {
t.stopInternalTimer()
Expand All @@ -170,15 +167,12 @@ func (t *Timer) Reset() {
case v, ok := <-internalTimer.C:
if ok {
t.mutex.Lock()
fmt.Println("[TimerReset Go Func] got the lock")
if t.c != nil {
t.c <- v
}
t.mutex.Unlock()
fmt.Println("[TimerReset Go Func] release the lock")
}
case <-resetChannel:
}
}()
fmt.Println("[TimerReset] release the lock")
}

0 comments on commit 5bef656

Please sign in to comment.