diff --git a/log.go b/log.go index a8cd112..71f7ffb 100644 --- a/log.go +++ b/log.go @@ -18,7 +18,6 @@ import ( // A log is a collection of log entries that are persisted to durable storage. type Log struct { ApplyFunc func(Command) ([]byte, error) - callBackFrom uint64 file *os.File path string entries []*LogEntry @@ -140,7 +139,7 @@ func (l *Log) CurrentTerm() uint64 { func (l *Log) Open(path string) error { l.mutex.Lock() defer l.mutex.Unlock() - + // Read all the entries from the log if one exists. var lastIndex int = 0 if _, err := os.Stat(path); !os.IsNotExist(err) { @@ -168,11 +167,11 @@ func (l *Log) Open(path string) error { } break } - + // Append entry. l.entries = append(l.entries, entry) l.commitIndex = entry.Index - + // Apply the command. entry.result, err = l.ApplyFunc(entry.Command) @@ -183,7 +182,7 @@ func (l *Log) Open(path string) error { file.Close() } - + // Open the file for appending. var err error l.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) @@ -240,7 +239,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { return l.entries, l.startTerm } - fmt.Println("[GetEntries] index ", index, "lastIndex", l.entries[len(l.entries) - 1].Index) + fmt.Println("[GetEntries] index ", index, "lastIndex", l.entries[len(l.entries)-1].Index) // Determine the term at the given entry and return a subslice. term := l.entries[index-1-l.startIndex].Term @@ -264,7 +263,6 @@ func (l *Log) GetEntryError(entry *LogEntry) error { return nil } - //-------------------------------------- // Commit //-------------------------------------- @@ -299,13 +297,12 @@ func (l *Log) LastInfo() (index uint64, term uint64) { return l.startIndex, l.startTerm } - // Return the last index & term - lastEntry := l.entries[len(l.entries) - 1] + // Return the last index & term + lastEntry := l.entries[len(l.entries)-1] return lastEntry.Index, lastEntry.Term } - -// Updates the commit index +// Updates the commit index func (l *Log) UpdateCommitIndex(index uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -452,7 +449,7 @@ func (l *Log) Compact(index uint64, term uint64) error { defer l.mutex.Unlock() // nothing to compaction - // the index may be greater than the current index if + // the index may be greater than the current index if // we just recovery from on snapshot if index >= l.internalCurrentIndex() { entries = make([]*LogEntry, 0) diff --git a/log_entry.go b/log_entry.go index f6beeed..7a715f5 100644 --- a/log_entry.go +++ b/log_entry.go @@ -19,10 +19,10 @@ import ( // A log entry stores a single item in the log. type LogEntry struct { log *Log - Index uint64 `json:"index"` - Term uint64 `json:"term"` - Command Command `json:"command"` - result []byte `json:"-"` + Index uint64 `json:"index"` + Term uint64 `json:"term"` + Command Command `json:"command"` + result []byte `json:"-"` commit chan bool `json:"-"` } @@ -47,7 +47,7 @@ func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry Index: index, Term: term, Command: command, - commit: make(chan bool, 3), + commit: make(chan bool, 3), } } diff --git a/log_entry_test.go b/log_entry_test.go new file mode 100644 index 0000000..9113aae --- /dev/null +++ b/log_entry_test.go @@ -0,0 +1,37 @@ +package raft + +import ( + "encoding/json" + //"reflect" + "testing" +) + +//------------------------------------------------------------------------------ +// +// Tests +// +//------------------------------------------------------------------------------ + +//-------------------------------------- +// Encoding +//-------------------------------------- + +// Ensure that we can encode a log entry to JSON. +func TestLogEntryMarshal(t *testing.T) { + e := NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}) + if b, err := json.Marshal(e); !(string(b) == `{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}` && err == nil) { + t.Fatalf("Unexpected log entry marshalling: %v (%v)", string(b), err) + } +} + +// // Ensure that we can decode a log entry from JSON. +// func TestLogEntryUnmarshal(t *testing.T) { +// e := &LogEntry{} +// b := []byte(`{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}`) +// if err := json.Unmarshal(b, e); err != nil { +// t.Fatalf("Log entry unmarshalling error: %v", err) +// } +// if !reflect.DeepEqual(e, NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})) { +// t.Fatalf("Log entry unmarshaled incorrectly: %v", e) +// } +// } diff --git a/log_test.go b/log_test.go new file mode 100644 index 0000000..a4a217b --- /dev/null +++ b/log_test.go @@ -0,0 +1,225 @@ +package raft + +import ( + "io/ioutil" + "os" + "reflect" + "testing" +) + +//------------------------------------------------------------------------------ +// +// Tests +// +//------------------------------------------------------------------------------ + +//-------------------------------------- +// Append +//-------------------------------------- + +// Ensure that we can append to a new log. +func TestLogNewLog(t *testing.T) { + path := getLogPath() + log := NewLog() + log.ApplyFunc = func(c Command) ([]byte, error) { + return nil, nil + } + if err := log.Open(path); err != nil { + t.Fatalf("Unable to open log: %v", err) + } + defer log.Close() + defer os.Remove(path) + + if err := log.AppendEntry(NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})); err != nil { + t.Fatalf("Unable to append: %v", err) + } + if err := log.AppendEntry(NewLogEntry(log, 2, 1, &TestCommand2{100})); err != nil { + t.Fatalf("Unable to append: %v", err) + } + if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})); err != nil { + t.Fatalf("Unable to append: %v", err) + } + + // Partial commit. + if err := log.SetCommitIndex(2); err != nil { + t.Fatalf("Unable to partially commit: %v", err) + } + expected := `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + actual, _ := ioutil.ReadFile(path) + if string(actual) != expected { + t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) + } + if index, term := log.CommitInfo(); index != 2 || term != 1 { + t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) + } + + // Full commit. + if err := log.SetCommitIndex(3); err != nil { + t.Fatalf("Unable to commit: %v", err) + } + expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n" + actual, _ = ioutil.ReadFile(path) + if string(actual) != expected { + t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) + } + if index, term := log.CommitInfo(); index != 3 || term != 2 { + t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) + } +} + +// Ensure that we can decode and encode to an existing log. +// func TestLogExistingLog(t *testing.T) { +// log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + +// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + +// `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n") +// defer log.Close() +// defer os.Remove(path) + +// // Validate existing log entries. +// if len(log.entries) != 3 { +// t.Fatalf("Expected 3 entries, got %d", len(log.entries)) +// } +// if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) { +// t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) +// } +// if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) { +// t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) +// } +// if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})) { +// t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) +// } +// } + +// Ensure that we can check the contents of the log by index/term. +func TestLogContainsEntries(t *testing.T) { + log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n") + defer log.Close() + defer os.Remove(path) + + if log.ContainsEntry(0, 0) { + t.Fatalf("Zero-index entry should not exist in log.") + } + if log.ContainsEntry(1, 0) { + t.Fatalf("Entry with mismatched term should not exist") + } + if log.ContainsEntry(4, 0) { + t.Fatalf("Out-of-range entry should not exist") + } + if !log.ContainsEntry(2, 1) { + t.Fatalf("Entry 2/1 should exist") + } + if !log.ContainsEntry(3, 2) { + t.Fatalf("Entry 2/1 should exist") + } +} + +// Ensure that we can recover from an incomplete/corrupt log and continue logging. +// func TestLogRecovery(t *testing.T) { +// path := setupLogFile(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + +// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + +// `6ac5807c 0000000000000003 00000000000`) +// log := NewLog() +// log.ApplyFunc = func(c Command) ([]byte, error) { +// return nil,nil +// } +// if err := log.Open(path); err != nil { +// t.Fatalf("Unable to open log: %v", err) +// } +// defer log.Close() +// defer os.Remove(path) + +// if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})); err != nil { +// t.Fatalf("Unable to append: %v", err) +// } + +// // Validate existing log entries. +// if len(log.entries) != 3 { +// t.Fatalf("Expected 2 entries, got %d", len(log.entries)) +// } +// if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) { +// t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) +// } +// if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) { +// t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) +// } +// if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})) { +// t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) +// } + +// // Validate precommit log contents. +// expected := `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + +// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +// actual, _ := ioutil.ReadFile(path) +// if string(actual) != expected { +// t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) +// } + +// // Validate committed log contents. +// if err := log.SetCommitIndex(3); err != nil { +// t.Fatalf("Unable to partially commit: %v", err) +// } +// expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + +// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + +// `3f3f884c 0000000000000003 0000000000000002 cmd_1 {"val":"bat","i":-5}` + "\n" +// actual, _ = ioutil.ReadFile(path) +// if string(actual) != expected { +// t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) +// } +// } + +//-------------------------------------- +// Append +//-------------------------------------- + +// Ensure that we can truncate uncommitted entries in the log. +func TestLogTruncate(t *testing.T) { + log, path := setupLog("") + if err := log.Open(path); err != nil { + t.Fatalf("Unable to open log: %v", err) + } + defer log.Close() + defer os.Remove(path) + + entry1 := NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20}) + if err := log.AppendEntry(entry1); err != nil { + t.Fatalf("Unable to append: %v", err) + } + entry2 := NewLogEntry(log, 2, 1, &TestCommand2{100}) + if err := log.AppendEntry(entry2); err != nil { + t.Fatalf("Unable to append: %v", err) + } + entry3 := NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0}) + if err := log.AppendEntry(entry3); err != nil { + t.Fatalf("Unable to append: %v", err) + } + if err := log.SetCommitIndex(2); err != nil { + t.Fatalf("Unable to partially commit: %v", err) + } + + // Truncate committed entry. + if err := log.Truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" { + t.Fatalf("Truncating committed entries shouldn't work: %v", err) + } + // Truncate past end of log. + if err := log.Truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" { + t.Fatalf("Truncating past end-of-log shouldn't work: %v", err) + } + // Truncate entry with mismatched term. + if err := log.Truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" { + t.Fatalf("Truncating mismatched entries shouldn't work: %v", err) + } + // Truncate end of log. + if err := log.Truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2, entry3})) { + t.Fatalf("Truncating end of log should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2, entry3}) + } + // Truncate at last commit. + if err := log.Truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2})) { + t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2}) + } + +} diff --git a/peer.go b/peer.go index a16917d..6e2cd18 100644 --- a/peer.go +++ b/peer.go @@ -2,9 +2,9 @@ package raft import ( "errors" + "fmt" "sync" "time" - "fmt" ) //------------------------------------------------------------------------------ @@ -23,10 +23,10 @@ type Peer struct { } type FlushResponse struct { - term uint64 + term uint64 success bool - err error - peer *Peer + err error + peer *Peer } //------------------------------------------------------------------------------ @@ -95,12 +95,10 @@ func (p *Peer) resume() { // Pauses the peer to prevent heartbeating. func (p *Peer) pause() { - fmt.Println( p.server.Name() ," Pause try lock ", p.Name()) p.mutex.Lock() defer p.mutex.Unlock() - fmt.Println( p.server.Name() ," Pause timer of ", p.Name()) + p.heartbeatTimer.Pause() - fmt.Println( p.server.Name() ," Finish Pause timer of ", p.Name()) } // Stops the peer entirely. @@ -114,35 +112,26 @@ func (p *Peer) stop() { // Flush //-------------------------------------- -// if internal is set true, sends an AppendEntries RPC but does not obtain a lock -// on the server. -func (p *Peer) flush(internal bool) (uint64, bool, error) { - // Retrieve the peer data within a lock that is separate from the - // server lock when creating the request. Otherwise a deadlock can - // occur. - //p.mutex.Lock() +// Sends an AppendEntries RPC but does not obtain a lock +// on the server. +func (p *Peer) flush() (uint64, bool, error) { + server, prevLogIndex := p.server, p.prevLogIndex - //p.mutex.Unlock() var req *AppendEntriesRequest snapShotNeeded := false // we need to hold the log lock to create AppendEntriesRequest // avoid snapshot to delete the desired entries before AEQ() + server.log.mutex.Lock() if prevLogIndex >= server.log.StartIndex() { - if internal { - req = server.createInternalAppendEntriesRequest(prevLogIndex) - } else { - req = server.createAppendEntriesRequest(prevLogIndex) - } + req = server.createInternalAppendEntriesRequest(prevLogIndex) } else { snapShotNeeded = true } server.log.mutex.Unlock() - // p.mutex.Lock() - // defer p.mutex.Unlock() if snapShotNeeded { req := server.createSnapshotRequest() return p.sendSnapshotRequest(req) @@ -192,8 +181,8 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) // log. Send the request through the user-provided handler and process the // result. //fmt.Println("flush to ", p.Name()) - fmt.Println("[HeartBeat] Leader ", p.server.Name(), " to ", - p.Name(), " ",len(req.Entries)," ", time.Now()) + fmt.Println("[HeartBeat] Leader ", p.server.Name(), " to ", + p.Name(), " ", len(req.Entries), " ", time.Now()) resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req) //fmt.Println("receive flush response from ", p.Name()) @@ -248,29 +237,32 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) { if c == nil { break } - + if _, ok := <-c; ok { - var f FlushResponse - + var f FlushResponse + f.peer = p - f.term, f.success, f.err = p.flush(true) + f.term, f.success, f.err = p.flush() + // if the peer successfully appended the log entry + // we will tell the commit center if f.success { if p.prevLogIndex > p.server.log.CommitIndex() { fmt.Println("[Heartbeat] Peer", p.Name(), "send to commit center") p.server.response <- f fmt.Println("[Heartbeat] Peer", p.Name(), "back from commit center") } + } else { if f.term > p.server.currentTerm { fmt.Println("[Heartbeat] SetpDown!") select { - case p.server.stepDown <- f.term: - p.pause() - default: - p.pause() + case p.server.stepDown <- f.term: + p.pause() + default: + p.pause() } } } diff --git a/request_vote.go b/request_vote.go index 49bea8b..2fd1f79 100644 --- a/request_vote.go +++ b/request_vote.go @@ -13,7 +13,6 @@ type RequestVoteRequest struct { CandidateName string `json:"candidateName"` LastLogIndex uint64 `json:"lastLogIndex"` LastLogTerm uint64 `json:"lastLogTerm"` - CommitIndex uint64 `json:"commitIndex"` } // The response returned from a server after a vote for a candidate to become a leader. @@ -30,13 +29,12 @@ type RequestVoteResponse struct { //------------------------------------------------------------------------------ // Creates a new RequestVote request. -func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64, commitIndex uint64) *RequestVoteRequest { +func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest { return &RequestVoteRequest{ Term: term, CandidateName: candidateName, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm, - CommitIndex: commitIndex, } } diff --git a/server.go b/server.go index 3d5857b..026c1cf 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package raft import ( + "encoding/json" "errors" "fmt" "io/ioutil" @@ -9,7 +10,6 @@ import ( "sort" "sync" "time" - "encoding/json" ) //------------------------------------------------------------------------------ @@ -48,27 +48,27 @@ var DuplicatePeerError = errors.New("raft.Server: Duplicate peer") // A server is involved in the consensus protocol and can act as a follower, // candidate or a leader. type Server struct { - name string - path string - state string - transporter Transporter - context interface{} - currentTerm uint64 - - votedFor string - log *Log - leader string - peers map[string]*Peer - mutex sync.Mutex + name string + path string + state string + transporter Transporter + context interface{} + currentTerm uint64 + + votedFor string + log *Log + leader string + peers map[string]*Peer + mutex sync.Mutex electionTimer *Timer heartbeatTimeout time.Duration response chan FlushResponse - stepDown chan uint64 + stepDown chan uint64 - currentSnapshot *Snapshot - lastSnapshot *Snapshot - stateMachine StateMachine + currentSnapshot *Snapshot + lastSnapshot *Snapshot + stateMachine StateMachine } //------------------------------------------------------------------------------ @@ -134,7 +134,7 @@ func (s *Server) Leader() string { return s.leader } -// Retrieves the peers of the server +// Retrieves the peers of the server func (s *Server) Peers() map[string]*Peer { s.mutex.Lock() defer s.mutex.Unlock() @@ -214,11 +214,7 @@ func (s *Server) LastCommandName() string { // Retrieves the number of member servers in the consensus. func (s *Server) MemberCount() int { - count := 1 - for _, _ = range s.peers { - count++ - } - return count + return len(s.peers) + 1 } // Retrieves the number of servers required to make a quorum. @@ -278,7 +274,7 @@ func (s *Server) SetHeartbeatTimeout(duration time.Duration) { //------------------------------------------------------------------------------ //-------------------------------------- -// State +// Initialization //-------------------------------------- // Starts the server with a log at the given path. @@ -291,12 +287,10 @@ func (s *Server) Initialize() error { return errors.New("raft.Server: Server already running") } - // Update the state. - s.state = Follower + // Initialize response channel + s.response = make(chan FlushResponse, 128) - // - s.response = make(chan FlushResponse, 100) - // create snapshot dir if not exist + // Create snapshot directory if not exist os.Mkdir(s.path+"/snapshot", 0700) // Initialize the log and load it up. @@ -305,37 +299,68 @@ func (s *Server) Initialize() error { s.unload() return fmt.Errorf("raft.Server: %v", err) } - + // Update the term to the last term in the log. s.currentTerm = s.log.CurrentTerm() - // update the startIndex - s.log.callBackFrom = uint64(len(s.log.entries)) + s.log.startIndex + return nil +} - for _, peer := range s.peers { - peer.pause() - } +// Start the sever as a follower +func (s *Server) StartFollower() { + // Update the state. + s.state = Follower + + // Start the election timeout. + c := make(chan bool) + s.electionTimer.Reset() + go s.electionTimeoutFunc(c) + <-c +} + +// Start the sever as a leader +func (s *Server) StartLeader() error { + s.mutex.Lock() + defer s.mutex.Unlock() + + // Start as leader. + s.currentTerm++ + s.state = Leader + s.leader = s.name + s.electionTimer.Pause() + + // Leader need to collect appendLog response + go s.commitCenter() return nil } +// Collect response from followers. If more than the +// majority of the followers append a log entry, the +// leader will commit the log entry func (s *Server) commitCenter() { fmt.Println("collecting data") for { - var response FlushResponse + var response FlushResponse select { - case response = <- s.response: + case response = <-s.response: - case term := <-s.stepDown: - s.setCurrentTerm(term) - return + case term := <-s.stepDown: + s.setCurrentTerm(term) + return } if response.peer != nil { fmt.Println("[CommitCenter] Receive respone from ", response.peer.Name(), response.success) } + // TODO: UINT64 SORTING + // Convert uint64 to int, since go does not have a built in + // func to sort uint64 + + // when the leader is the only member in the cluster, it can commit + // the log immediately if s.QuorumSize() < 2 { fmt.Println("[CommitCenter] Commit ", s.log.CurrentIndex()) @@ -346,17 +371,21 @@ func (s *Server) commitCenter() { for i := commited; i < commit; i++ { select { - case s.log.entries[i - int(s.log.startIndex)].commit <- true: + case s.log.entries[i-int(s.log.startIndex)].commit <- true: fmt.Println("notify") continue + // we have a buffered commit channel, it should return immediately default: - continue + panic("Cannot send commit nofication") } } - continue } + // TODO: Current we use sort which is O(NlogN). + // we should record the previous infomation and + // find the index to commit in O(1) + var data []int data = append(data, int(s.log.CurrentIndex())) @@ -365,22 +394,20 @@ func (s *Server) commitCenter() { } sort.Ints(data) - commit := data[s.QuorumSize() - 1] + + // We can commit upto the index which the mojarity + // of the members have appended. + commit := data[s.QuorumSize()-1] commited := int(s.log.CommitIndex()) if commit > commited { fmt.Println("[CommitCenter] Going to Commit ", commit) s.log.SetCommitIndex(uint64(commit)) - // entry := s.log.entries[commit - 1- int(s.log.startIndex)] - - // if (commit > int(s.startIndex)) { - // fmt.Println("[CommitCenter] Wait join Commit ", entry.Index) - // } for i := commited; i < commit; i++ { select { - case s.log.entries[i - int(s.log.startIndex)].commit <- true: + case s.log.entries[i-int(s.log.startIndex)].commit <- true: fmt.Println("notify") continue default: @@ -395,13 +422,8 @@ func (s *Server) commitCenter() { } +func (s *Server) commitNotify() { -func (s *Server) StartFollower() { - // Start the election timeout. - c := make(chan bool) - s.electionTimer.Reset() - go s.electionTimeoutFunc(c) - <-c } // Shuts down the server. @@ -442,35 +464,6 @@ func (s *Server) Running() bool { return s.state != Stopped } -//-------------------------------------- -// Initialization -//-------------------------------------- - -// Initializes the server to become leader of a new cluster. This function -// will fail if there is an existing log or the server is already a member in -// an existing cluster. -func (s *Server) StartLeader() error { - s.mutex.Lock() - defer s.mutex.Unlock() - - // Exit if the server is not running. - if !s.Running() { - return errors.New("raft.Server: Cannot initialize while stopped") - } else if s.MemberCount() > 1 { - return errors.New("raft.Server: Cannot initialize; already in membership") - } - - // Promote to leader. - s.currentTerm++ - s.state = Leader - s.leader = s.name - s.electionTimer.Pause() - - go s.commitCenter() - - return nil -} - //-------------------------------------- // Commands //-------------------------------------- @@ -498,12 +491,12 @@ func (s *Server) Do(command Command) ([]byte, error) { // timeout here select { - case <-entry.commit: - fmt.Println("[Do] finish!") - return entry.result, nil - case <-time.After(time.Second): - fmt.Println("[Do] fail!") - return nil, errors.New("Command commit fails") + case <-entry.commit: + fmt.Println("[Do] finish!") + return entry.result, nil + case <-time.After(time.Second): + fmt.Println("[Do] fail!") + return nil, errors.New("Command commit fails") } } @@ -521,9 +514,9 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons if req.Term < s.currentTerm { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term") } - - fmt.Println("Peer ", s.Name(), "received heartbeat from ", req.LeaderName, - " ",req.Term," ", s.currentTerm, " ",time.Now()) + + fmt.Println("Peer ", s.Name(), "received heartbeat from ", req.LeaderName, + " ", req.Term, " ", s.currentTerm, " ", time.Now()) s.setCurrentTerm(req.Term) @@ -534,7 +527,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons if s.electionTimer != nil { s.electionTimer.Reset() } - + // Reject if log doesn't contain a matching previous entry. if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err @@ -545,7 +538,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons // Append entries to the log. if err := s.log.AppendEntries(req.Entries); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err - } + } fmt.Println("Peer ", s.Name(), "after append ") // Commit up to the commit index. @@ -555,8 +548,8 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons fmt.Println("Peer ", s.Name(), "after commit ") - fmt.Println("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName, - " ",req.Term," ", s.currentTerm, " ",time.Now()) + fmt.Println("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName, + " ", req.Term, " ", s.currentTerm, " ", time.Now()) return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil } @@ -599,7 +592,7 @@ func (s *Server) promote() (bool, error) { for _, _peer := range s.peers { peer := _peer go func() { - req := NewRequestVoteRequest(term, s.name, lastLogIndex, lastLogTerm, s.log.commitIndex) + req := NewRequestVoteRequest(term, s.name, lastLogIndex, lastLogTerm) req.peer = peer fmt.Println(s.Name(), "Send Vote Request to ", peer.Name()) if resp, _ := s.transporter.SendVoteRequest(s, peer, req); resp != nil { @@ -616,7 +609,7 @@ func (s *Server) promote() (bool, error) { timeout := false for { - // if timeout happened, restart the promotion + // if timeout happened, restart the promotion if timeout { break } @@ -696,7 +689,7 @@ func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) { // Return server state so we can check for it during leader promotion. lastLogIndex, lastLogTerm := s.log.LastInfo() - fmt.Println("[PromoteToCandidate] Follower ", s.Name(), + fmt.Println("[PromoteToCandidate] Follower ", s.Name(), "promote to candidate[", lastLogIndex, ",", lastLogTerm, "]") return s.currentTerm, lastLogIndex, lastLogTerm, nil @@ -712,31 +705,34 @@ func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm u // Ignore promotion if we are not a candidate. if s.state != Candidate { - return false + panic("promote to leader but not candidate") } + // TODO: should panic or just a false? + // Disallow promotion if the term or log does not match what we currently have. logIndex, logTerm := s.log.LastInfo() if s.currentTerm != term || logIndex != lastLogIndex || logTerm != lastLogTerm { return false } - // Move server to become a leader and begin peer heartbeats. s.state = Leader s.leader = s.name - s.log.callBackFrom = lastLogIndex + // Begin to collect response from followers + go s.commitCenter() + + // Update the peers prevLogIndex to leader's lastLogIndex + // Start heartbeat for _, peer := range s.peers { - // start from lastLogIndex - // start from commitedIndex + fmt.Println("[Leader] Set ", peer.Name(), "Prev to", lastLogIndex) + peer.prevLogIndex = lastLogIndex peer.resume() } - go s.commitCenter() - return true } @@ -769,40 +765,51 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor) } - // If the candidate's log is not at least as up-to-date as our committed log then don't vote. - // lastCommitIndex, lastCommitTerm := s.log.CommitInfo() - - + // If the candidate's log is not at least as up-to-date as + // our last log then don't vote. lastIndex, lastTerm := s.log.LastInfo() - - if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm { return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm) } - // If we made it this far then cast a vote and reset our election time out. s.votedFor = req.CandidateName + fmt.Println(s.Name(), "Vote for ", req.CandidateName) + if s.electionTimer != nil { s.electionTimer.Reset() } + return NewRequestVoteResponse(s.currentTerm, true), nil } -// Updates the current term on the server if the term is greater than the +// Updates the current term on the server if the term is greater than the // server's current term. When the term is changed then the server's vote is // cleared and its state is changed to be a follower. func (s *Server) setCurrentTerm(term uint64) { if term > s.currentTerm { s.votedFor = "" - if s.state == Leader{ + + if s.state == Leader { fmt.Println(s.Name(), " step down to a follower") + + // stop heartbeats + for _, peer := range s.peers { + peer.pause() + } + + select { + case s.stepDown <- term: + + default: + + } + } + s.state = Follower - for _, peer := range s.peers { - peer.pause() - } + // update term after stop all the peer s.currentTerm = term } @@ -845,8 +852,6 @@ func (s *Server) electionTimeoutFunc(startChannel chan bool) { func (s *Server) AddPeer(name string) error { // Do not allow peers to be added twice. - - if s.peers[name] != nil { return DuplicatePeerError } @@ -879,7 +884,7 @@ func (s *Server) RemovePeer(name string) error { // Flush entries to the peer first. if s.state == Leader { - if _, _, err := peer.flush(true); err != nil { + if _, _, err := peer.flush(); err != nil { warn("raft: Unable to notify peer of removal: %v", err) } } @@ -908,7 +913,7 @@ func (s *Server) Snapshot() { // TODO: change this... to something reasonable time.Sleep(60 * time.Second) - s.takeSnapshot() + s.takeSnapshot() } } @@ -943,13 +948,12 @@ func (s *Server) takeSnapshot() error { var peerNames []string - for _, peer := range s.peers { peerNames = append(peerNames, peer.Name()) } peerNames = append(peerNames, s.Name()) - s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path} + s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path} s.saveSnapshot() @@ -964,7 +968,7 @@ func (s *Server) saveSnapshot() error { if s.currentSnapshot == nil { return errors.New("no snapshot to save") } - + err := s.currentSnapshot.Save() if err != nil { @@ -1007,9 +1011,9 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm) s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath} - + s.saveSnapshot() - + s.log.Compact(req.LastIndex, req.LastTerm) return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil @@ -1020,7 +1024,7 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro func (s *Server) LoadSnapshot() error { dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0) if err != nil { - + return err } @@ -1046,8 +1050,8 @@ func (s *Server) LoadSnapshot() error { if err != nil { panic(err) } - - // TODO check checksum first + + // TODO check checksum first var snapshotBytes []byte var checksum []byte diff --git a/server_test.go b/server_test.go index 19d9fdb..3f0e284 100644 --- a/server_test.go +++ b/server_test.go @@ -1,12 +1,12 @@ package raft import ( - //"reflect" + "fmt" + "reflect" + "strconv" "sync" "testing" "time" - "fmt" - "strconv" ) //------------------------------------------------------------------------------ @@ -20,98 +20,97 @@ import ( //-------------------------------------- // Ensure that we can request a vote from a server that has not voted. -// func TestServerRequestVote(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// defer server.Stop() -// resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) -// if !(resp.Term == 1 && resp.VoteGranted && err == nil) { -// t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) -// } -// } +func TestServerRequestVote(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + defer server.Stop() + resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) + if !(resp.Term == 1 && resp.VoteGranted && err == nil) { + t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) + } +} // // Ensure that a vote request is denied if it comes from an old term. -// func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// server.state = Leader -// server.currentTerm = 2 -// defer server.Stop() -// resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) -// if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") { -// t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) -// } -// if server.currentTerm != 2 && server.state != Follower { -// t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.state) -// } -// } +func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + server.currentTerm = 2 + defer server.Stop() + + resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) + if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") { + t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) + } + if server.currentTerm != 2 && server.state != Follower { + t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.state) + } +} // // Ensure that a vote request is denied if we've already voted for a different candidate. -// func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// server.currentTerm = 2 -// defer server.Stop() -// resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) -// if !(resp.Term == 2 && resp.VoteGranted && err == nil) { -// t.Fatalf("First vote should not have been denied (%v)", err) -// } -// resp, err = server.RequestVote(NewRequestVoteRequest(2, "bar", 0, 0)) -// if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Already voted for foo") { -// t.Fatalf("Second vote should have been denied (%v)", err) -// } -// } +func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + server.currentTerm = 2 + defer server.Stop() + resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) + if !(resp.Term == 2 && resp.VoteGranted && err == nil) { + t.Fatalf("First vote should not have been denied (%v)", err) + } + resp, err = server.RequestVote(NewRequestVoteRequest(2, "bar", 0, 0)) + if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Already voted for foo") { + t.Fatalf("Second vote should have been denied (%v)", err) + } +} // // Ensure that a vote request is approved if vote occurs in a new term. -// func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// server.currentTerm = 2 -// defer server.Stop() -// resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) -// if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) { -// t.Fatalf("First vote should not have been denied (%v)", err) -// } -// resp, err = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0)) -// if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar" && err == nil) { -// t.Fatalf("Second vote should have been approved (%v)", err) -// } -// } +func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + server.currentTerm = 2 + defer server.Stop() + resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) + if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) { + t.Fatalf("First vote should not have been denied (%v)", err) + } + resp, err = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0)) + if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar" && err == nil) { + t.Fatalf("Second vote should have been approved (%v)", err) + } +} // // Ensure that a vote request is denied if the log is out of date. -// func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { -// server := newTestServerWithLog("1", &testTransporter{}, -// `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n"+ -// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n"+ -// `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n") -// server.Initialize() -// // server.StartLeader() -// // if err := server.Start(); err != nil { -// // t.Fatalf("Unable to start server: %v", err) -// // } -// defer server.Stop() - -// resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 2, 2)) -// if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") { -// t.Fatalf("Stale index vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) -// } -// resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 1)) -// if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [3/1]") { -// t.Fatalf("Stale term vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) -// } -// resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 2)) -// if !(resp.Term == 2 && resp.VoteGranted && err == nil) { -// t.Fatalf("Matching log vote should have been granted (%v)", err) -// } -// resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 4, 3)) -// if !(resp.Term == 2 && resp.VoteGranted && err == nil) { -// t.Fatalf("Ahead-of-log vote should have been granted (%v)", err) -// } -// } +func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { + server := newTestServerWithLog("1", &testTransporter{}, + `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n"+ + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n"+ + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n") + server.Initialize() + server.StartLeader() + server.currentTerm = 2 + + defer server.Stop() + + resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 2, 2)) + if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") { + t.Fatalf("Stale index vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) + } + resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 1)) + if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [3/1]") { + t.Fatalf("Stale term vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) + } + resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 2)) + if !(resp.Term == 2 && resp.VoteGranted && err == nil) { + t.Fatalf("Matching log vote should have been granted (%v)", err) + } + resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 4, 3)) + if !(resp.Term == 2 && resp.VoteGranted && err == nil) { + t.Fatalf("Ahead-of-log vote should have been granted (%v)", err) + } +} // //-------------------------------------- // // Promotion @@ -143,12 +142,13 @@ import ( // defer server.Stop() // } // leader := servers[0] +// leader.StartFollower() // if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) { // t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err) // } // } -// // Ensure that a server will restart election if not enough votes are obtained before timeout. +// Ensure that a server will restart election if not enough votes are obtained before timeout. // func TestServerPromoteDoubleElection(t *testing.T) { // lookup := map[string]*Server{} // transporter := &testTransporter{} @@ -169,6 +169,7 @@ import ( // defer server.Stop() // } // leader := servers[0] +// leader.StartFollower() // if success, err := leader.promote(); !(success && err == nil && leader.state == Leader && leader.currentTerm == 2) { // t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err) // } @@ -186,169 +187,166 @@ import ( //-------------------------------------- // Ensure we can append entries to a server. -// func TestServerAppendEntries(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// defer server.Stop() - -// // Append single entry. -// entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} -// resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) -// if !(resp.Term == 1 && resp.Success && err == nil) { -// t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) -// } -// if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { -// t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) -// } - -// // Append multiple entries + commit the last one. -// entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20}), NewLogEntry(nil, 3, 1, &TestCommand1{"baz", 30})} -// resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 1, 1, entries, 1)) -// if !(resp.Term == 1 && resp.Success && err == nil) { -// t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) -// } -// if index, term := server.log.CommitInfo(); !(index == 1 && term == 1) { -// t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) -// } +func TestServerAppendEntries(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + defer server.Stop() + + // Append single entry. + entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} + resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) + if !(resp.Term == 1 && resp.Success && err == nil) { + t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) + } + if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { + t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) + } -// // Send zero entries and commit everything. -// resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3)) -// if !(resp.Term == 2 && resp.Success && err == nil) { -// t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) -// } -// if index, term := server.log.CommitInfo(); !(index == 3 && term == 1) { -// t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) -// } -// } + // Append multiple entries + commit the last one. + entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20}), NewLogEntry(nil, 3, 1, &TestCommand1{"baz", 30})} + resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 1, 1, entries, 1)) + if !(resp.Term == 1 && resp.Success && err == nil) { + t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) + } + if index, term := server.log.CommitInfo(); !(index == 1 && term == 1) { + t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) + } -// Ensure that entries with stale terms are rejected. -// func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// defer server.Stop() -// server.currentTerm = 2 + // Send zero entries and commit everything. + resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3)) + if !(resp.Term == 2 && resp.Success && err == nil) { + t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) + } + if index, term := server.log.CommitInfo(); !(index == 3 && term == 1) { + t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) + } +} -// // Append single entry. -// entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} -// resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) -// if !(resp.Term == 2 && !resp.Success && err != nil && err.Error() == "raft.Server: Stale request term") { -// t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) -// } -// if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { -// t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) -// } -// } +//Ensure that entries with stale terms are rejected. +func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + defer server.Stop() + server.currentTerm = 2 + + // Append single entry. + entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} + resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) + if !(resp.Term == 2 && !resp.Success && err != nil && err.Error() == "raft.Server: Stale request term") { + t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) + } + if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { + t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) + } +} // Ensure that we reject entries if the commit log is different. -// func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// defer server.Stop() - -// // Append single entry + commit. -// entries := []*LogEntry{ -// NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}), -// NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}), -// } -// resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 2)) -// if !(resp.Term == 1 && resp.Success && err == nil) { -// t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) -// } +func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + defer server.Stop() + + // Append single entry + commit. + entries := []*LogEntry{ + NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}), + NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}), + } + resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 2)) + if !(resp.Term == 1 && resp.Success && err == nil) { + t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) + } -// // Append entry again (post-commit). -// entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20})} -// resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 2, 1, entries, 1)) -// if !(resp.Term == 1 && !resp.Success && err != nil && err.Error() == "raft.Log: Cannot append entry with earlier index in the same term (1:2 <= 1:2)") { -// t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) -// } -// } + // Append entry again (post-commit). + entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20})} + resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 2, 1, entries, 1)) + if !(resp.Term == 1 && !resp.Success && err != nil && err.Error() == "raft.Log: Cannot append entry with earlier index in the same term (1:2 <= 1:2)") { + t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) + } +} // Ensure that we uncommitted entries are rolled back if new entries overwrite them. -// func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartLeader() -// defer server.Stop() - -// entry1 := NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}) -// entry2 := NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}) -// entry3 := NewLogEntry(nil, 2, 2, &TestCommand1{"bar", 20}) - -// // Append single entry + commit. -// entries := []*LogEntry{entry1, entry2} -// resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 1)) -// if !(resp.Term == 1 && resp.Success && err == nil && server.log.CommitIndex() == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) { -// t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) -// } +func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartLeader() + defer server.Stop() + + entry1 := NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}) + entry2 := NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}) + entry3 := NewLogEntry(nil, 2, 2, &TestCommand1{"bar", 20}) + + // Append single entry + commit. + entries := []*LogEntry{entry1, entry2} + resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 1)) + if !(resp.Term == 1 && resp.Success && err == nil && server.log.CommitIndex() == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) { + t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) + } -// // Append entry that overwrites the second (uncommitted) entry. -// entries = []*LogEntry{entry3} -// resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 1, 1, entries, 2)) -// if !(resp.Term == 2 && resp.Success && err == nil && server.log.CommitIndex() == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) { -// t.Fatalf("AppendEntries should have succeeded: %v/%v : %v", resp.Term, resp.Success, err) -// } -// } + // Append entry that overwrites the second (uncommitted) entry. + entries = []*LogEntry{entry3} + resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 1, 1, entries, 2)) + if !(resp.Term == 2 && resp.Success && err == nil && server.log.CommitIndex() == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) { + t.Fatalf("AppendEntries should have succeeded: %v/%v : %v", resp.Term, resp.Success, err) + } +} //-------------------------------------- // Command Execution //-------------------------------------- // Ensure that a follower cannot execute a command. -// func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { -// server := newTestServer("1", &testTransporter{}) -// server.Initialize() -// server.StartFollower() -// defer server.Stop() -// var err error -// if _, err = server.Do(&TestCommand1{"foo", 10}); err != NotLeaderError { -// t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err) -// } -// } +func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { + server := newTestServer("1", &testTransporter{}) + server.Initialize() + server.StartFollower() + defer server.Stop() + var err error + if _, err = server.Do(&TestCommand1{"foo", 10}); err != NotLeaderError { + t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err) + } +} //-------------------------------------- // Membership //-------------------------------------- // Ensure that we can start a single server and append to its log. -// func TestServerSingleNode(t *testing.T) { -// fmt.Println("-----SignalNodeTest-------") -// server := newTestServer("1", &testTransporter{}) -// if server.state != Stopped { -// t.Fatalf("Unexpected server state: %v", server.state) -// } +func TestServerSingleNode(t *testing.T) { + fmt.Println("-----SignalNodeTest-------") + server := newTestServer("1", &testTransporter{}) + if server.state != Stopped { + t.Fatalf("Unexpected server state: %v", server.state) + } -// server.Initialize() + server.Initialize() + defer server.Stop() -// // Get the server running. -// // if err := server.Start(); err != nil { -// // t.Fatalf("Unable to start server: %v", err) -// // } -// defer server.Stop() -// if server.state != Follower { -// t.Fatalf("Unexpected server state: %v", server.state) -// } + if server.state != Stopped { + t.Fatalf("Unexpected server state: %v", server.state) + } -// server.StartLeader() -// time.Sleep(time.Second) -// // Join the server to itself. -// if _, err := server.Do(&joinCommand{Name: "1"}); err != nil { -// t.Fatalf("Unable to join: %v", err) -// } -// fmt.Println("finish command") + server.StartLeader() + time.Sleep(time.Second) -// if server.state != Leader { -// t.Fatalf("Unexpected server state: %v", server.state) -// } + // Join the server to itself. + if _, err := server.Do(&joinCommand{Name: "1"}); err != nil { + t.Fatalf("Unable to join: %v", err) + } + fmt.Println("finish command") -// // Stop the server. -// server.Stop() -// if server.state != Stopped { -// t.Fatalf("Unexpected server state: %v", server.state) -// } -// } + if server.state != Leader { + t.Fatalf("Unexpected server state: %v", server.state) + } + + // Stop the server. + server.Stop() + if server.state != Stopped { + t.Fatalf("Unexpected server state: %v", server.state) + } +} // Ensure that we can start multiple servers and determine a leader. func TestServerMultiNode(t *testing.T) { @@ -358,22 +356,14 @@ func TestServerMultiNode(t *testing.T) { var mutex sync.Mutex servers := map[string]*Server{} - transporter := &testTransporter{} transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) { - fmt.Println("vote request") - //mutex.Lock() s := servers[peer.name] - fmt.Println("vote request 2") - //mutex.Unlock() resp, err := s.RequestVote(req) - fmt.Println("finish vote request") return resp, err } transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) { - //mutex.Lock() s := servers[peer.name] - //mutex.Unlock() resp, err := s.AppendEntries(req) return resp, err } @@ -386,7 +376,6 @@ func TestServerMultiNode(t *testing.T) { return nil, nil } - var names []string n := 5 @@ -399,9 +388,6 @@ func TestServerMultiNode(t *testing.T) { var leader *Server for _, name := range names { server := newTestServer(name, transporter) - // if err := server.Start(); err != nil { - // t.Fatalf("Unable to start server[%s]: %v", name, err) - // } server.Initialize() mutex.Lock() @@ -410,9 +396,6 @@ func TestServerMultiNode(t *testing.T) { if name == "1" { leader = server - // if err := server.Initialize(); err != nil { - // t.Fatalf("Unable to initialize server[%s]: %v", name, err) - // } server.SetHeartbeatTimeout(testHeartbeatTimeout) server.StartLeader() } else { @@ -437,12 +420,11 @@ func TestServerMultiNode(t *testing.T) { } mutex.Unlock() - i := 0 - for { + for i := 0; i < 15; i++ { i++ fmt.Println("Round ", i) - num:= strconv.Itoa(i % (len(servers)) + 1) + num := strconv.Itoa(i%(len(servers)) + 1) toStop := servers[num] // Stop the first server and wait for a re-election. @@ -460,7 +442,7 @@ func TestServerMultiNode(t *testing.T) { if key != num { if value.State() == Leader { fmt.Println("Found leader") - for i := 0; i < 10; i++{ + for i := 0; i < 10; i++ { fmt.Println("[Test] do ", value.Name()) if _, err := value.Do(&TestCommand2{X: 1}); err != nil { t.Fatalf("Unable to do command") @@ -468,8 +450,8 @@ func TestServerMultiNode(t *testing.T) { fmt.Println("[Test] Done") } - leader ++ - fmt.Println("Leader is ", value.Name()," Index ", value.log.commitIndex) + leader++ + fmt.Println("Leader is ", value.Name(), " Index ", value.log.commitIndex) } fmt.Println("Not Found leader") } @@ -483,5 +465,5 @@ func TestServerMultiNode(t *testing.T) { toStop.SetTransporter(transporter) } - + } diff --git a/snapshot.go b/snapshot.go index 4cfd69e..e5e52e7 100644 --- a/snapshot.go +++ b/snapshot.go @@ -2,11 +2,11 @@ package raft import ( //"bytes" + "encoding/json" "fmt" "hash/crc32" "os" "syscall" - "encoding/json" ) //------------------------------------------------------------------------------ @@ -15,15 +15,15 @@ import ( // //------------------------------------------------------------------------------ -// the in memory SnapShot struct +// the in memory SnapShot struct // TODO add cluster configuration type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` - // cluster configuration. - Peers []string `json: "peers"` - State []byte `json: "state"` - Path string `json: "path"` + // cluster configuration. + Peers []string `json: "peers"` + State []byte `json: "state"` + Path string `json: "path"` } // Save the snapshot to a file @@ -39,7 +39,6 @@ func (ss *Snapshot) Save() error { defer file.Close() - b, err := json.Marshal(ss) // Generate checksum. diff --git a/snapshot_request.go b/snapshot_request.go index 3cc8157..18d39cc 100644 --- a/snapshot_request.go +++ b/snapshot_request.go @@ -2,11 +2,11 @@ package raft // The request sent to a server to start from the snapshot. type SnapshotRequest struct { - LeaderName string `json:"leaderName"` - LastIndex uint64 `json:"lastTerm"` - LastTerm uint64 `json:"lastIndex"` - Peers []string `json:peers` - State []byte `json:"state"` + LeaderName string `json:"leaderName"` + LastIndex uint64 `json:"lastTerm"` + LastTerm uint64 `json:"lastIndex"` + Peers []string `json:peers` + State []byte `json:"state"` } // The response returned from a server appending entries to the log. diff --git a/statemachine.go b/statemachine.go index 49fff7e..e59036c 100644 --- a/statemachine.go +++ b/statemachine.go @@ -6,7 +6,7 @@ package raft // //------------------------------------------------------------------------------ -// StateMachine is the interface for allowing the host application to save and +// StateMachine is the interface for allowing the host application to save and // recovery the state machine type StateMachine interface { Save() ([]byte, error) diff --git a/test.go b/test.go index 146653e..d18d6a0 100644 --- a/test.go +++ b/test.go @@ -5,7 +5,7 @@ import ( "io/ioutil" "os" "time" - "errors" + //"errors" ) const ( @@ -133,7 +133,6 @@ func (sm *testStateMachine) Recovery(state []byte) error { type joinCommand struct { Name string `json:"name"` - finish chan bool } func (c *joinCommand) CommandName() string { @@ -145,20 +144,6 @@ func (c *joinCommand) Apply(server *Server) ([]byte, error) { return nil, err } - -func (c *joinCommand) Init() { - c.finish = make(chan bool) -} - -func (c *joinCommand) Join() ([]byte, error) { - <-c.finish - return nil, nil -} - -func (c *joinCommand) Finish() { - c.finish <- true -} - //-------------------------------------- // Command1 //-------------------------------------- @@ -166,7 +151,6 @@ func (c *joinCommand) Finish() { type TestCommand1 struct { Val string `json:"val"` I int `json:"i"` - finish chan bool } func (c *TestCommand1) CommandName() string { @@ -174,30 +158,15 @@ func (c *TestCommand1) CommandName() string { } func (c *TestCommand1) Apply(server *Server) ([]byte, error) { - c.finish = make(chan bool) - return nil, nil -} - -func (c *TestCommand1) Init() { - c.finish = make(chan bool) -} - -func (c *TestCommand1) Join() ([]byte, error) { - <-c.finish return nil, nil } -func (c *TestCommand1) Finish() { - c.finish <- true -} - //-------------------------------------- // Command2 //-------------------------------------- type TestCommand2 struct { X int `json:"x"` - finish chan bool } func (c *TestCommand2) CommandName() string { @@ -205,24 +174,6 @@ func (c *TestCommand2) CommandName() string { } func (c *TestCommand2) Apply(server *Server) ([]byte, error) { - - return nil, nil -} - -func (c *TestCommand2) Init() { - c.finish = make(chan bool) -} -func (c *TestCommand2) Join() ([]byte, error) { - select { - case <-c.finish: - return nil, nil - case <-afterBetween(time.Second, time.Second*2): - return nil, errors.New("timeout") - } -} - -func (c *TestCommand2) Finish() { - c.finish <- true + return nil, nil } - diff --git a/timer.go b/timer.go index 209062d..80b44c8 100644 --- a/timer.go +++ b/timer.go @@ -1,10 +1,10 @@ package raft import ( + "fmt" "math/rand" "sync" "time" - "fmt" ) //------------------------------------------------------------------------------ @@ -142,9 +142,9 @@ func (t *Timer) stopInternalTimer() { func (t *Timer) fire() { select { - case t.c <-time.Now(): + case t.c <- time.Now(): return - default : + default: return } } @@ -172,11 +172,11 @@ func (t *Timer) Reset() { // it through to the timer's external channel. select { case v, ok := <-internalTimer.C: - if ok { - // send to the outer channel if we could - select { - case t.c <- v: - default: + if ok { + // send to the outer channel if we could + select { + case t.c <- v: + default: } } case <-resetChannel: