Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
add snapshot unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jun 6, 2013
1 parent 718ef79 commit f15cade
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 69 deletions.
70 changes: 33 additions & 37 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,42 @@ func (p *Peer) stop() {
// Flush
//--------------------------------------

// Sends an AppendEntries RPC but does not obtain a lock on the server. This
// method should only be called from the server.
func (p *Peer) internalFlush() (uint64, bool, error) {
// if internal is set true, sends an AppendEntries RPC but does not obtain a lock
// on the server.
func (p *Peer) flush(internal bool) (uint64, bool, error) {
// Retrieve the peer data within a lock that is separate from the
// server lock when creating the request. Otherwise a deadlock can
// occur.
p.mutex.Lock()
defer p.mutex.Unlock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()

var req *AppendEntriesRequest
snapShotNeeded := false

// we need to hold the log lock to create AppendEntriesRequest
// avoid snapshot to delete the desired entries before AEQ()
server.log.mutex.Lock()
if prevLogIndex >= server.log.StartIndex() {
if internal {
req = server.createInternalAppendEntriesRequest(prevLogIndex)
} else {
req = server.createAppendEntriesRequest(prevLogIndex)
}
} else {
snapShotNeeded = true
}
server.log.mutex.Unlock()

if p.prevLogIndex < p.server.log.StartIndex() {
req := p.server.createSnapshotRequest()
p.mutex.Lock()
defer p.mutex.Unlock()
if snapShotNeeded {
req := server.createSnapshotRequest()
return p.sendSnapshotRequest(req)
} else {
return p.sendFlushRequest(req)
}
req := p.server.createInternalAppendEntriesRequest(p.prevLogIndex)
return p.sendFlushRequest(req)

}

// send Snapshot Request
Expand Down Expand Up @@ -207,35 +231,7 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
// Flush the peer when we get a heartbeat timeout. If the channel is
// closed then the peer is getting cleaned up and we should exit.
if _, ok := <-c; ok {
// Retrieve the peer data within a lock that is separate from the
// server lock when creating the request. Otherwise a deadlock can
// occur.
p.mutex.Lock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()

var req *AppendEntriesRequest
snapShotNeeded := false

// we need to hold the log lock to create AppendEntriesRequest
// avoid snapshot to delete the desired entries before AEQ()
server.log.mutex.Lock()
if prevLogIndex >= server.log.StartIndex() {
req = server.createAppendEntriesRequest(prevLogIndex)
} else {
snapShotNeeded = true
}
server.log.mutex.Unlock()

p.mutex.Lock()
if snapShotNeeded {
req := server.createSnapshotRequest()
p.sendSnapshotRequest(req)
} else {
p.sendFlushRequest(req)
}
p.mutex.Unlock()

p.flush(false)

} else {
break
Expand Down
42 changes: 20 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,10 @@ func (s *Server) do(command Command) error {
// Capture the term that this command is executing within.
currentTerm := s.currentTerm

// TEMP to solve the issue 18
for _, peer := range s.peers {
peer.pause()
}
// // TEMP to solve the issue 18
// for _, peer := range s.peers {
// peer.pause()
// }

// Add a new entry to the log.
entry := s.log.CreateEntry(s.currentTerm, command)
Expand All @@ -386,7 +386,7 @@ func (s *Server) do(command Command) error {
peer := _peer
go func() {

term, success, err := peer.internalFlush()
term, success, err := peer.flush(true)

// Demote if we encounter a higher term.
if err != nil {
Expand Down Expand Up @@ -419,9 +419,9 @@ loop:
// If we received enough votes then stop waiting for more votes.
if responseCount >= s.QuorumSize() {
committed = true
for _, peer := range s.peers {
peer.resume()
}
// for _, peer := range s.peers {
// peer.resume()
// }
break
}

Expand All @@ -434,9 +434,9 @@ loop:
}
responseCount++
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
for _, peer := range s.peers {
peer.resume()
}
// for _, peer := range s.peers {
// peer.resume()
// }
break loop
}
}
Expand Down Expand Up @@ -515,6 +515,7 @@ func (s *Server) createInternalAppendEntriesRequest(prevLogIndex uint64) *Append
// server is elected then true is returned. If another server is elected then
// false is returned.
func (s *Server) promote() (bool, error) {

for {
// Start a new election.
term, lastLogIndex, lastLogTerm, err := s.promoteToCandidate()
Expand Down Expand Up @@ -610,7 +611,6 @@ func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) {
s.leader = ""
// Pause the election timer while we're a candidate.
s.electionTimer.Pause()

// Return server state so we can check for it during leader promotion.
lastLogIndex, lastLogTerm := s.log.CommitInfo()
return s.currentTerm, lastLogIndex, lastLogTerm, nil
Expand Down Expand Up @@ -655,7 +655,6 @@ func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm u
func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

// Fail if the server is not running.
if !s.Running() {
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped")
Expand Down Expand Up @@ -746,7 +745,6 @@ func (s *Server) AddPeer(name string) error {
peer.resume()
}
s.peers[peer.name] = peer
peer.resume()

}
return nil
Expand Down Expand Up @@ -827,24 +825,24 @@ func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
}


func (s *Server) SnapshotRecovery(index uint64, term uint64, machineState int) (*SnapshotResponse, error){
func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error){
//
s.mutex.Lock()
defer s.mutex.Unlock()

//recovery machine state
s.machineState = machineState
s.machineState = req.MachineState

//update term and index
s.currentTerm = term
s.log.UpdateCommitIndex(index)
snapshotPath := s.SnapshotPath(index, term)
s.currentSnapshot = &Snapshot{index, term, machineState, snapshotPath}
s.currentTerm = req.LastTerm
s.log.UpdateCommitIndex(req.LastIndex)
snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.MachineState, snapshotPath}
s.saveSnapshot()
s.log.Compact(index, term)
s.log.Compact(req.LastIndex, req.LastTerm)


return NewSnapshotResponse(term, true, index), nil
return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil

}

Expand Down
11 changes: 3 additions & 8 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"testing"
"time"
"fmt"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -390,16 +389,12 @@ func TestServerMultiNode(t *testing.T) {
time.Sleep(100 * time.Millisecond)
leader.Stop()
time.Sleep(100 * time.Millisecond)

// Check that either server 2 or 3 is the leader now.
mutex.Lock()
if servers["2"].State() != Leader && servers["3"].State() != Leader {
t.Fatalf("Expected leader re-election: 2=%v, 3=%v", servers["2"].state, servers["3"].state)
t.Fatalf("Expected leader re-election: 2=%v, 3=%v\n", servers["2"].state, servers["3"].state)
}
mutex.Unlock()

// Stop the servers.
for _, server := range servers {
server.Stop()
}
}


2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (ss *Snapshot) Save() error {

// open file
file, err := os.OpenFile(ss.path, os.O_CREATE|os.O_WRONLY, 0600)

if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"io/ioutil"
"os"
"time"
"time"
)

const (
Expand Down Expand Up @@ -98,6 +98,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
type testTransporter struct {
sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error)
}

func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
Expand All @@ -108,6 +109,9 @@ func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
return t.sendAppendEntriesRequestFunc(server, peer, req)
}

func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) {
return t.sendSnapshotRequestFunc(server, peer, req)
}

//--------------------------------------
// Join Command
Expand Down

0 comments on commit f15cade

Please sign in to comment.