Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
go fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Jun 8, 2013
1 parent 3bcf91a commit 769a5ed
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 118 deletions.
43 changes: 21 additions & 22 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Log struct {
commitIndex uint64
mutex sync.Mutex
startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64
startTerm uint64
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -56,6 +56,7 @@ func (l *Log) StartIndex() uint64 {
func (l *Log) SetStartTerm(t uint64) {
l.startTerm = t
}

//--------------------------------------
// Log Indices
//--------------------------------------
Expand Down Expand Up @@ -218,7 +219,7 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool {
l.mutex.Lock()
defer l.mutex.Unlock()

if index <= l.startIndex || index > (l.startIndex + uint64(len(l.entries))) {
if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) {
return false
}
return (l.entries[index-1].Term == term)
Expand All @@ -237,20 +238,20 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
return l.entries, l.startTerm
}
// Determine the term at the given entry and return a subslice.
term := l.entries[index - 1 - l.startIndex].Term
return l.entries[index - l.startIndex:], term
term := l.entries[index-1-l.startIndex].Term
return l.entries[index-l.startIndex:], term
}

// Retrieves the error returned from an entry. The error can only exist after
// the entry has been committed.
func (l *Log) GetEntryError(entry *LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()

if entry == nil {
panic("raft: Log entry required for error retrieval")
}

if entry.Index > 0 && entry.Index <= uint64(len(l.errors)) {
return l.errors[entry.Index-1]
}
Expand All @@ -277,7 +278,7 @@ func (l *Log) CommitInfo() (index uint64, term uint64) {
}

// Return the last index & term from the last committed entry.
lastCommitEntry := l.entries[l.commitIndex - 1 - l.startIndex]
lastCommitEntry := l.entries[l.commitIndex-1-l.startIndex]
return lastCommitEntry.Index, lastCommitEntry.Term
}

Expand All @@ -298,7 +299,7 @@ func (l *Log) SetCommitIndex(index uint64) error {
if index < l.commitIndex {
return fmt.Errorf("raft.Log: Commit index (%d) ahead of requested commit index (%d)", l.commitIndex, index)
}
if index > l.startIndex + uint64(len(l.entries)) {
if index > l.startIndex+uint64(len(l.entries)) {
return fmt.Errorf("raft.Log: Commit index (%d) out of range (%d)", index, len(l.entries))
}

Expand Down Expand Up @@ -338,7 +339,7 @@ func (l *Log) Truncate(index uint64, term uint64) error {
}

// Do not truncate past end of entries.
if index > l.startIndex + uint64(len(l.entries)) {
if index > l.startIndex+uint64(len(l.entries)) {
return fmt.Errorf("raft.Log: Entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term)
}

Expand All @@ -347,14 +348,14 @@ func (l *Log) Truncate(index uint64, term uint64) error {
l.entries = []*LogEntry{}
} else {
// Do not truncate if the entry at index does not have the matching term.
entry := l.entries[index - l.startIndex - 1]
entry := l.entries[index-l.startIndex-1]
if len(l.entries) > 0 && entry.Term != term {
return fmt.Errorf("raft.Log: Entry at index does not have matching term (%v): (IDX=%v, TERM=%v)", entry.Term, index, term)
}

// Otherwise truncate up to the desired entry.
if index < l.startIndex + uint64(len(l.entries)) {
l.entries = l.entries[0:index - l.startIndex]
if index < l.startIndex+uint64(len(l.entries)) {
l.entries = l.entries[0 : index-l.startIndex]
}
}

Expand Down Expand Up @@ -413,8 +414,6 @@ func (l *Log) appendEntry(entry *LogEntry) error {
return nil
}



//--------------------------------------
// Log compaction
//--------------------------------------
Expand All @@ -434,20 +433,20 @@ func (l *Log) Compact(index uint64, term uint64) error {
} else {

// get all log entries after index
entries = l.entries[index - l.startIndex:]
entries = l.entries[index-l.startIndex:]
}

// create a new log file and add all the entries
file, err := os.OpenFile(l.path + ".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
for _, entry := range entries {
err = entry.Encode(file)
if err != nil {
return err
}
}
err = entry.Encode(file)
if err != nil {
return err
}
}
// close the current log file
l.file.Close()

Expand All @@ -458,7 +457,7 @@ func (l *Log) Compact(index uint64, term uint64) error {
}

// rename the new log file
err = os.Rename(l.path + ".new", l.path)
err = os.Rename(l.path+".new", l.path)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
c := make(chan bool)
go p.heartbeatTimeoutFunc(c)
<-c

return p
}

Expand Down Expand Up @@ -108,7 +108,7 @@ func (p *Peer) flush(internal bool) (uint64, bool, error) {
p.mutex.Lock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()

var req *AppendEntriesRequest
snapShotNeeded := false

Expand All @@ -134,11 +134,11 @@ func (p *Peer) flush(internal bool) (uint64, bool, error) {
} else {
return p.sendFlushRequest(req)
}

}

// send Snapshot Request
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error){
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
Expand All @@ -163,7 +163,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error){
panic(resp)
}

return resp.Term, resp.Success, err
return resp.Term, resp.Success, err
}

// Flushes a request through the server's transport.
Expand Down Expand Up @@ -216,7 +216,7 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
for {
// Grab the current timer channel.
p.mutex.Lock()

var c chan time.Time
if p.heartbeatTimer != nil {
c = p.heartbeatTimer.C()
Expand All @@ -231,7 +231,7 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
// Flush the peer when we get a heartbeat timeout. If the channel is
// closed then the peer is getting cleaned up and we should exit.
if _, ok := <-c; ok {
p.flush(false)
p.flush(false)

} else {
break
Expand Down
75 changes: 36 additions & 39 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package raft
import (
"errors"
"fmt"
"sync"
"time"
"io/ioutil"
"os"
"sort"
"path"
"io/ioutil"
"sort"
"sync"
"time"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -47,22 +47,22 @@ var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
electionTimer *Timer
heartbeatTimeout time.Duration
currentSnapshot *Snapshot
lastSnapshot *Snapshot
stateMachine StateMachine
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
electionTimer *Timer
heartbeatTimeout time.Duration
currentSnapshot *Snapshot
lastSnapshot *Snapshot
stateMachine StateMachine
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -125,7 +125,8 @@ func (s *Server) Leader() string {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.leader
}
}

// Retrieves the object that transports requests.
func (s *Server) Transporter() Transporter {
return s.transporter
Expand Down Expand Up @@ -256,7 +257,7 @@ func (s *Server) Start() error {
}

// create snapshot dir if not exist
os.Mkdir(s.path + "/snapshot", 0700)
os.Mkdir(s.path+"/snapshot", 0700)

// ## open recovery from the newest snapShot
//s.LoadSnapshot()
Expand All @@ -279,7 +280,7 @@ func (s *Server) Start() error {
// Start the election timeout.
c := make(chan bool)
go s.electionTimeoutFunc(c)
<- c
<-c

return nil
}
Expand Down Expand Up @@ -387,10 +388,10 @@ func (s *Server) do(command Command) error {
go func() {

term, success, err := peer.flush(true)

// Demote if we encounter a higher term.
if err != nil {

return
} else if term > currentTerm {
s.mutex.Lock()
Expand Down Expand Up @@ -468,15 +469,15 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term")
}
s.setCurrentTerm(req.Term)

// Update the current leader.
s.leader = req.LeaderName

// Reset election timeout.
if s.electionTimer != nil {
s.electionTimer.Reset()
}

// Reject if log doesn't contain a matching previous entry.
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
Expand All @@ -490,7 +491,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
// Commit up to the commit index.
if err := s.log.SetCommitIndex(req.CommitIndex); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
}
}

return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil
}
Expand Down Expand Up @@ -783,7 +784,6 @@ func (s *Server) RemovePeer(name string) error {
return nil
}


//--------------------------------------
// Log compaction
//--------------------------------------
Expand Down Expand Up @@ -819,9 +819,9 @@ func (s *Server) takeSnapshot() error {

path := s.SnapshotPath(lastIndex, lastTerm)

state, err := s.stateMachine.Save()
state, err := s.stateMachine.Save()

if err !=nil {
if err != nil {
return err
}

Expand All @@ -846,7 +846,7 @@ func (s *Server) saveSnapshot() error {
if err != nil {
return err
}

tmp := s.lastSnapshot
s.lastSnapshot = s.currentSnapshot

Expand All @@ -863,8 +863,7 @@ func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
}


func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error){
func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error) {
//
s.mutex.Lock()
defer s.mutex.Unlock()
Expand All @@ -876,10 +875,9 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro
s.log.UpdateCommitIndex(req.LastIndex)
snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.State, snapshotPath}
s.saveSnapshot()
s.saveSnapshot()
s.log.Compact(req.LastIndex, req.LastTerm)


return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil

}
Expand All @@ -906,7 +904,7 @@ func (s *Server) LoadSnapshot() error {

// not sure how many snapshot we should keep
sort.Strings(filenames)
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames) - 1])
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])

// should not file
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
Expand All @@ -921,7 +919,7 @@ func (s *Server) LoadSnapshot() error {
var state []byte
var checksum, lastIndex, lastTerm uint64

n , err := fmt.Fscanf(file, "%08x\n%v\n%v", &checksum, &lastIndex, &lastTerm)
n, err := fmt.Fscanf(file, "%08x\n%v\n%v", &checksum, &lastIndex, &lastTerm)

if err != nil {
return err
Expand All @@ -946,4 +944,3 @@ func (s *Server) LoadSnapshot() error {

return err
}

Loading

0 comments on commit 769a5ed

Please sign in to comment.