From 769a5ed6a17f146d21149c04377b442eb451c9f7 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 7 Jun 2013 22:19:18 -0400 Subject: [PATCH] go fmt --- log.go | 43 +++++++++++++------------- peer.go | 14 ++++----- server.go | 75 ++++++++++++++++++++++----------------------- server_test.go | 6 ++-- snapshot.go | 17 +++++----- snapshot_request.go | 18 +++++------ snapshot_test.go | 31 +++++++++---------- statemachine.go | 2 +- test.go | 14 ++++----- transporter.go | 6 ++-- 10 files changed, 108 insertions(+), 118 deletions(-) diff --git a/log.go b/log.go index 5c33070..cdc916d 100644 --- a/log.go +++ b/log.go @@ -25,7 +25,7 @@ type Log struct { commitIndex uint64 mutex sync.Mutex startIndex uint64 // the index before the first entry in the Log entries - startTerm uint64 + startTerm uint64 } //------------------------------------------------------------------------------ @@ -56,6 +56,7 @@ func (l *Log) StartIndex() uint64 { func (l *Log) SetStartTerm(t uint64) { l.startTerm = t } + //-------------------------------------- // Log Indices //-------------------------------------- @@ -218,7 +219,7 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool { l.mutex.Lock() defer l.mutex.Unlock() - if index <= l.startIndex || index > (l.startIndex + uint64(len(l.entries))) { + if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) { return false } return (l.entries[index-1].Term == term) @@ -237,8 +238,8 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { return l.entries, l.startTerm } // Determine the term at the given entry and return a subslice. - term := l.entries[index - 1 - l.startIndex].Term - return l.entries[index - l.startIndex:], term + term := l.entries[index-1-l.startIndex].Term + return l.entries[index-l.startIndex:], term } // Retrieves the error returned from an entry. The error can only exist after @@ -246,11 +247,11 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { func (l *Log) GetEntryError(entry *LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() - + if entry == nil { panic("raft: Log entry required for error retrieval") } - + if entry.Index > 0 && entry.Index <= uint64(len(l.errors)) { return l.errors[entry.Index-1] } @@ -277,7 +278,7 @@ func (l *Log) CommitInfo() (index uint64, term uint64) { } // Return the last index & term from the last committed entry. - lastCommitEntry := l.entries[l.commitIndex - 1 - l.startIndex] + lastCommitEntry := l.entries[l.commitIndex-1-l.startIndex] return lastCommitEntry.Index, lastCommitEntry.Term } @@ -298,7 +299,7 @@ func (l *Log) SetCommitIndex(index uint64) error { if index < l.commitIndex { return fmt.Errorf("raft.Log: Commit index (%d) ahead of requested commit index (%d)", l.commitIndex, index) } - if index > l.startIndex + uint64(len(l.entries)) { + if index > l.startIndex+uint64(len(l.entries)) { return fmt.Errorf("raft.Log: Commit index (%d) out of range (%d)", index, len(l.entries)) } @@ -338,7 +339,7 @@ func (l *Log) Truncate(index uint64, term uint64) error { } // Do not truncate past end of entries. - if index > l.startIndex + uint64(len(l.entries)) { + if index > l.startIndex+uint64(len(l.entries)) { return fmt.Errorf("raft.Log: Entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term) } @@ -347,14 +348,14 @@ func (l *Log) Truncate(index uint64, term uint64) error { l.entries = []*LogEntry{} } else { // Do not truncate if the entry at index does not have the matching term. - entry := l.entries[index - l.startIndex - 1] + entry := l.entries[index-l.startIndex-1] if len(l.entries) > 0 && entry.Term != term { return fmt.Errorf("raft.Log: Entry at index does not have matching term (%v): (IDX=%v, TERM=%v)", entry.Term, index, term) } // Otherwise truncate up to the desired entry. - if index < l.startIndex + uint64(len(l.entries)) { - l.entries = l.entries[0:index - l.startIndex] + if index < l.startIndex+uint64(len(l.entries)) { + l.entries = l.entries[0 : index-l.startIndex] } } @@ -413,8 +414,6 @@ func (l *Log) appendEntry(entry *LogEntry) error { return nil } - - //-------------------------------------- // Log compaction //-------------------------------------- @@ -434,20 +433,20 @@ func (l *Log) Compact(index uint64, term uint64) error { } else { // get all log entries after index - entries = l.entries[index - l.startIndex:] + entries = l.entries[index-l.startIndex:] } // create a new log file and add all the entries - file, err := os.OpenFile(l.path + ".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return err } for _, entry := range entries { - err = entry.Encode(file) - if err != nil { - return err - } - } + err = entry.Encode(file) + if err != nil { + return err + } + } // close the current log file l.file.Close() @@ -458,7 +457,7 @@ func (l *Log) Compact(index uint64, term uint64) error { } // rename the new log file - err = os.Rename(l.path + ".new", l.path) + err = os.Rename(l.path+".new", l.path) if err != nil { return err } diff --git a/peer.go b/peer.go index 68b6055..64cefd8 100644 --- a/peer.go +++ b/peer.go @@ -39,7 +39,7 @@ func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer c := make(chan bool) go p.heartbeatTimeoutFunc(c) <-c - + return p } @@ -108,7 +108,7 @@ func (p *Peer) flush(internal bool) (uint64, bool, error) { p.mutex.Lock() server, prevLogIndex := p.server, p.prevLogIndex p.mutex.Unlock() - + var req *AppendEntriesRequest snapShotNeeded := false @@ -134,11 +134,11 @@ func (p *Peer) flush(internal bool) (uint64, bool, error) { } else { return p.sendFlushRequest(req) } - + } // send Snapshot Request -func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error){ +func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) { // Ignore any null requests. if req == nil { return 0, false, errors.New("raft.Peer: Request required") @@ -163,7 +163,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error){ panic(resp) } - return resp.Term, resp.Success, err + return resp.Term, resp.Success, err } // Flushes a request through the server's transport. @@ -216,7 +216,7 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) { for { // Grab the current timer channel. p.mutex.Lock() - + var c chan time.Time if p.heartbeatTimer != nil { c = p.heartbeatTimer.C() @@ -231,7 +231,7 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) { // Flush the peer when we get a heartbeat timeout. If the channel is // closed then the peer is getting cleaned up and we should exit. if _, ok := <-c; ok { - p.flush(false) + p.flush(false) } else { break diff --git a/server.go b/server.go index 80c52b8..30b3613 100644 --- a/server.go +++ b/server.go @@ -3,12 +3,12 @@ package raft import ( "errors" "fmt" - "sync" - "time" + "io/ioutil" "os" - "sort" "path" - "io/ioutil" + "sort" + "sync" + "time" ) //------------------------------------------------------------------------------ @@ -47,22 +47,22 @@ var DuplicatePeerError = errors.New("raft.Server: Duplicate peer") // A server is involved in the consensus protocol and can act as a follower, // candidate or a leader. type Server struct { - name string - path string - state string - transporter Transporter - context interface{} - currentTerm uint64 - votedFor string - log *Log - leader string - peers map[string]*Peer - mutex sync.Mutex - electionTimer *Timer - heartbeatTimeout time.Duration - currentSnapshot *Snapshot - lastSnapshot *Snapshot - stateMachine StateMachine + name string + path string + state string + transporter Transporter + context interface{} + currentTerm uint64 + votedFor string + log *Log + leader string + peers map[string]*Peer + mutex sync.Mutex + electionTimer *Timer + heartbeatTimeout time.Duration + currentSnapshot *Snapshot + lastSnapshot *Snapshot + stateMachine StateMachine } //------------------------------------------------------------------------------ @@ -125,7 +125,8 @@ func (s *Server) Leader() string { s.mutex.Lock() defer s.mutex.Unlock() return s.leader -} +} + // Retrieves the object that transports requests. func (s *Server) Transporter() Transporter { return s.transporter @@ -256,7 +257,7 @@ func (s *Server) Start() error { } // create snapshot dir if not exist - os.Mkdir(s.path + "/snapshot", 0700) + os.Mkdir(s.path+"/snapshot", 0700) // ## open recovery from the newest snapShot //s.LoadSnapshot() @@ -279,7 +280,7 @@ func (s *Server) Start() error { // Start the election timeout. c := make(chan bool) go s.electionTimeoutFunc(c) - <- c + <-c return nil } @@ -387,10 +388,10 @@ func (s *Server) do(command Command) error { go func() { term, success, err := peer.flush(true) - + // Demote if we encounter a higher term. if err != nil { - + return } else if term > currentTerm { s.mutex.Lock() @@ -468,7 +469,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term") } s.setCurrentTerm(req.Term) - + // Update the current leader. s.leader = req.LeaderName @@ -476,7 +477,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons if s.electionTimer != nil { s.electionTimer.Reset() } - + // Reject if log doesn't contain a matching previous entry. if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err @@ -490,7 +491,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons // Commit up to the commit index. if err := s.log.SetCommitIndex(req.CommitIndex); err != nil { return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err - } + } return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil } @@ -783,7 +784,6 @@ func (s *Server) RemovePeer(name string) error { return nil } - //-------------------------------------- // Log compaction //-------------------------------------- @@ -819,9 +819,9 @@ func (s *Server) takeSnapshot() error { path := s.SnapshotPath(lastIndex, lastTerm) - state, err := s.stateMachine.Save() + state, err := s.stateMachine.Save() - if err !=nil { + if err != nil { return err } @@ -846,7 +846,7 @@ func (s *Server) saveSnapshot() error { if err != nil { return err } - + tmp := s.lastSnapshot s.lastSnapshot = s.currentSnapshot @@ -863,8 +863,7 @@ func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string { return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex)) } - -func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error){ +func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error) { // s.mutex.Lock() defer s.mutex.Unlock() @@ -876,10 +875,9 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro s.log.UpdateCommitIndex(req.LastIndex) snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm) s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.State, snapshotPath} - s.saveSnapshot() + s.saveSnapshot() s.log.Compact(req.LastIndex, req.LastTerm) - return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil } @@ -906,7 +904,7 @@ func (s *Server) LoadSnapshot() error { // not sure how many snapshot we should keep sort.Strings(filenames) - snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames) - 1]) + snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1]) // should not file file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0) @@ -921,7 +919,7 @@ func (s *Server) LoadSnapshot() error { var state []byte var checksum, lastIndex, lastTerm uint64 - n , err := fmt.Fscanf(file, "%08x\n%v\n%v", &checksum, &lastIndex, &lastTerm) + n, err := fmt.Fscanf(file, "%08x\n%v\n%v", &checksum, &lastIndex, &lastTerm) if err != nil { return err @@ -946,4 +944,3 @@ func (s *Server) LoadSnapshot() error { return err } - diff --git a/server_test.go b/server_test.go index 6487189..012c3fc 100644 --- a/server_test.go +++ b/server_test.go @@ -315,7 +315,7 @@ func TestServerSingleNode(t *testing.T) { // Join the server to itself. server.Initialize() - if err := server.Do(&joinCommand{Name:"1"}); err != nil { + if err := server.Do(&joinCommand{Name: "1"}); err != nil { t.Fatalf("Unable to join: %v", err) } if server.state != Leader { @@ -368,7 +368,7 @@ func TestServerMultiNode(t *testing.T) { t.Fatalf("Unable to initialize server[%s]: %v", name, err) } } - if err := leader.Do(&joinCommand{Name:name}); err != nil { + if err := leader.Do(&joinCommand{Name: name}); err != nil { t.Fatalf("Unable to join server[%s]: %v", name, err) } @@ -396,5 +396,3 @@ func TestServerMultiNode(t *testing.T) { } mutex.Unlock() } - - diff --git a/snapshot.go b/snapshot.go index 059cd2c..0f15a2e 100644 --- a/snapshot.go +++ b/snapshot.go @@ -1,12 +1,12 @@ package raft import ( - "hash/crc32" - "fmt" - "syscall" "bytes" + "fmt" + "hash/crc32" "os" - ) + "syscall" +) //------------------------------------------------------------------------------ // @@ -18,10 +18,10 @@ import ( // TODO add cluster configuration type Snapshot struct { lastIndex uint64 - lastTerm uint64 + lastTerm uint64 // cluster configuration. state []byte - path string + path string } // Save the snapshot to a file @@ -45,14 +45,13 @@ func (ss *Snapshot) Save() error { defer file.Close() - // Write snapshot with checksum. - if _, err = fmt.Fprintf(file, "%08x\n%v\n%v\n", checksum, ss.lastIndex, + if _, err = fmt.Fprintf(file, "%08x\n%v\n%v\n", checksum, ss.lastIndex, ss.lastTerm); err != nil { return err } - if _, err = file.Write(ss.state); err != nil { + if _, err = file.Write(ss.state); err != nil { return err } diff --git a/snapshot_request.go b/snapshot_request.go index 657e6d3..3e862d7 100644 --- a/snapshot_request.go +++ b/snapshot_request.go @@ -2,10 +2,10 @@ package raft // The request sent to a server to start from the snapshot. type SnapshotRequest struct { - LeaderName string `json:"leaderName"` - LastIndex uint64 `json:"lastTerm"` - LastTerm uint64 `json:"lastIndex"` - State []byte `json:"state"` + LeaderName string `json:"leaderName"` + LastIndex uint64 `json:"lastTerm"` + LastTerm uint64 `json:"lastIndex"` + State []byte `json:"state"` } // The response returned from a server appending entries to the log. @@ -24,10 +24,10 @@ type SnapshotResponse struct { // Creates a new Snapshot request. func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest { return &SnapshotRequest{ - LeaderName: leaderName, - LastIndex: snapshot.lastIndex, - LastTerm: snapshot.lastTerm, - State: snapshot.state, + LeaderName: leaderName, + LastIndex: snapshot.lastIndex, + LastTerm: snapshot.lastTerm, + State: snapshot.state, } } @@ -38,4 +38,4 @@ func NewSnapshotResponse(term uint64, success bool, commitIndex uint64) *Snapsho Success: success, CommitIndex: commitIndex, } -} \ No newline at end of file +} diff --git a/snapshot_test.go b/snapshot_test.go index 52b463b..25d2707 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -1,10 +1,10 @@ package raft import ( + "bytes" "sync" "testing" "time" - "bytes" ) // test take and send snapshot @@ -43,8 +43,8 @@ func TestTakeAndSendSnapshot(t *testing.T) { stateMachine := &testStateMachine{} - stateMachine.saveFunc = func() ([]byte,error) { - return []byte{0x8},nil + stateMachine.saveFunc = func() ([]byte, error) { + return []byte{0x8}, nil } stateMachine.recoveryFunc = func(state []byte) error { @@ -66,7 +66,7 @@ func TestTakeAndSendSnapshot(t *testing.T) { t.Fatalf("Unable to initialize server[%s]: %v", name, err) } } - if err := leader.Do(&joinCommand{Name:name}); err != nil { + if err := leader.Do(&joinCommand{Name: name}); err != nil { t.Fatalf("Unable to join server[%s]: %v", name, err) } @@ -106,7 +106,7 @@ func TestTakeAndSendSnapshot(t *testing.T) { } if leader.log.startIndex != 4 || leader.log.startTerm != 1 { - t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", + t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", leader.log.startIndex, leader.log.startTerm) } @@ -118,7 +118,7 @@ func TestTakeAndSendSnapshot(t *testing.T) { t.Fatalf("Unable to start server[4]: %v", err) } - if err := leader.Do(&joinCommand{Name:"4"}); err != nil { + if err := leader.Do(&joinCommand{Name: "4"}); err != nil { t.Fatalf("Unable to join server[4]: %v", err) } @@ -130,24 +130,23 @@ func TestTakeAndSendSnapshot(t *testing.T) { time.Sleep(100 * time.Millisecond) if leader.log.startIndex != 4 || leader.log.startTerm != 1 { - t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", + t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", leader.log.startIndex, leader.log.startTerm) } time.Sleep(100 * time.Millisecond) } - func TestStartFormSnapshot(t *testing.T) { server := newTestServer("1", &testTransporter{}) stateMachine := &testStateMachine{} - stateMachine.saveFunc = func() ([]byte,error) { - return []byte{0x60,0x61,0x62,0x63,0x64,0x65},nil + stateMachine.saveFunc = func() ([]byte, error) { + return []byte{0x60, 0x61, 0x62, 0x63, 0x64, 0x65}, nil } stateMachine.recoveryFunc = func(state []byte) error { - expect := []byte{0x60,0x61,0x62,0x63,0x64,0x65} + expect := []byte{0x60, 0x61, 0x62, 0x63, 0x64, 0x65} if !(bytes.Equal(state, expect)) { t.Fatalf("Invalid State [Expcet=%v, Actual=%v]", expect, state) } @@ -156,7 +155,7 @@ func TestStartFormSnapshot(t *testing.T) { server.stateMachine = stateMachine oldPath := server.path server.Start() - + server.Initialize() // commit single entry. @@ -175,7 +174,7 @@ func TestStartFormSnapshot(t *testing.T) { } if server.log.startIndex != 1 || server.log.startTerm != 1 { - t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", + t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", server.log.startIndex, server.log.startTerm) } @@ -199,14 +198,14 @@ func TestStartFormSnapshot(t *testing.T) { } if server.log.startIndex != 0 || server.log.startTerm != 0 { - t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", + t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", server.log.startIndex, server.log.startTerm) } server.LoadSnapshot() if server.log.startIndex != 1 || server.log.startTerm != 1 { - t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", + t.Fatalf("Invalid log info [StartIndex=%v, StartTERM=%v]", server.log.startIndex, server.log.startTerm) } -} \ No newline at end of file +} diff --git a/statemachine.go b/statemachine.go index 555e162..49fff7e 100644 --- a/statemachine.go +++ b/statemachine.go @@ -11,4 +11,4 @@ package raft type StateMachine interface { Save() ([]byte, error) Recovery([]byte) error -} \ No newline at end of file +} diff --git a/test.go b/test.go index d5a87c1..bf95681 100644 --- a/test.go +++ b/test.go @@ -4,7 +4,7 @@ import ( "fmt" "io/ioutil" "os" - "time" + "time" ) const ( @@ -96,9 +96,9 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* //-------------------------------------- type testTransporter struct { - sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) - sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) - sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) + sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) + sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) + sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) } func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) { @@ -111,11 +111,10 @@ func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) { return t.sendSnapshotRequestFunc(server, peer, req) -} - +} type testStateMachine struct { - saveFunc func() ([]byte, error) + saveFunc func() ([]byte, error) recoveryFunc func([]byte) error } @@ -127,7 +126,6 @@ func (sm *testStateMachine) Recovery(state []byte) error { return sm.recoveryFunc(state) } - //-------------------------------------- // Join Command //-------------------------------------- diff --git a/transporter.go b/transporter.go index 6b5bd80..0746671 100644 --- a/transporter.go +++ b/transporter.go @@ -9,7 +9,7 @@ package raft // Transporter is the interface for allowing the host application to transport // requests to other nodes. type Transporter interface { - SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) - SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) - SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) + SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) + SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) + SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) }