Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/xiangli-cmu/go-raft into …
Browse files Browse the repository at this point in the history
…xiangli-cmu-master

Conflicts:
	log.go
  • Loading branch information
benbjohnson committed Jun 7, 2013
2 parents bb7caaf + 86bff03 commit 3bcf91a
Show file tree
Hide file tree
Showing 11 changed files with 735 additions and 54 deletions.
2 changes: 0 additions & 2 deletions append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package raft

// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
peer *Peer
Term uint64 `json:"term"`
LeaderName string `json:"leaderName"`
PrevLogIndex uint64 `json:"prevLogIndex"`
Expand All @@ -19,7 +18,6 @@ type AppendEntriesRequest struct {

// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
peer *Peer
Term uint64 `json:"term"`
Success bool `json:"success"`
CommitIndex uint64 `json:"commitIndex"`
Expand Down
128 changes: 110 additions & 18 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
type Log struct {
ApplyFunc func(Command) error
file *os.File
path string
entries []*LogEntry
errors []error
commitIndex uint64
mutex sync.Mutex
startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64
}

//------------------------------------------------------------------------------
Expand All @@ -42,6 +45,17 @@ func NewLog() *Log {
//
//------------------------------------------------------------------------------

func (l *Log) SetStartIndex(i uint64) {
l.startIndex = i
}

func (l *Log) StartIndex() uint64 {
return l.startIndex
}

func (l *Log) SetStartTerm(t uint64) {
l.startTerm = t
}
//--------------------------------------
// Log Indices
//--------------------------------------
Expand All @@ -52,7 +66,15 @@ func (l *Log) CurrentIndex() uint64 {
defer l.mutex.Unlock()

if len(l.entries) == 0 {
return 0
return l.startIndex
}
return l.entries[len(l.entries)-1].Index
}

// The current index in the log without locking
func (l *Log) internalCurrentIndex() uint64 {
if len(l.entries) == 0 {
return l.startIndex
}
return l.entries[len(l.entries)-1].Index
}
Expand Down Expand Up @@ -96,7 +118,7 @@ func (l *Log) CurrentTerm() uint64 {
defer l.mutex.Unlock()

if len(l.entries) == 0 {
return 0
return l.startTerm
}
return l.entries[len(l.entries)-1].Term
}
Expand Down Expand Up @@ -165,7 +187,7 @@ func (l *Log) Open(path string) error {
if err != nil {
return err
}

l.path = path
return nil
}

Expand Down Expand Up @@ -196,7 +218,7 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool {
l.mutex.Lock()
defer l.mutex.Unlock()

if index == 0 || index > uint64(len(l.entries)) {
if index <= l.startIndex || index > (l.startIndex + uint64(len(l.entries))) {
return false
}
return (l.entries[index-1].Term == term)
Expand All @@ -206,18 +228,17 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool {
// the term of the index provided.
func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
// Return an error if the index doesn't exist.
if index > uint64(len(l.entries)) {
if index > (uint64(len(l.entries)) + l.startIndex) {
panic(fmt.Sprintf("raft.Log: Index is beyond end of log: %v", index))
}

// If we're going from the beginning of the log then return the whole log.
if index == 0 {
return l.entries, 0
if index == l.startIndex {
return l.entries, l.startTerm
}

// Determine the term at the given entry and return a subslice.
term := l.entries[index-1].Term
return l.entries[index:], term
term := l.entries[index - 1 - l.startIndex].Term
return l.entries[index - l.startIndex:], term
}

// Retrieves the error returned from an entry. The error can only exist after
Expand Down Expand Up @@ -250,11 +271,24 @@ func (l *Log) CommitInfo() (index uint64, term uint64) {
return 0, 0
}

// no new commit log after snapshot
if l.commitIndex == l.startIndex {
return l.startIndex, l.startTerm
}

// Return the last index & term from the last committed entry.
lastCommitEntry := l.entries[l.commitIndex-1]
lastCommitEntry := l.entries[l.commitIndex - 1 - l.startIndex]
return lastCommitEntry.Index, lastCommitEntry.Term
}

// Updates the commit index
func (l *Log) UpdateCommitIndex(index uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.commitIndex = index

}

// Updates the commit index and writes entries after that index to the stable storage.
func (l *Log) SetCommitIndex(index uint64) error {
l.mutex.Lock()
Expand All @@ -264,13 +298,13 @@ func (l *Log) SetCommitIndex(index uint64) error {
if index < l.commitIndex {
return fmt.Errorf("raft.Log: Commit index (%d) ahead of requested commit index (%d)", l.commitIndex, index)
}
if index > uint64(len(l.entries)) {
if index > l.startIndex + uint64(len(l.entries)) {
return fmt.Errorf("raft.Log: Commit index (%d) out of range (%d)", index, len(l.entries))
}

// Find all entries whose index is between the previous index and the current index.
for i := l.commitIndex + 1; i <= index; i++ {
entryIndex := i-1
entryIndex := i - 1 - l.startIndex
entry := l.entries[entryIndex]

// Write to storage.
Expand Down Expand Up @@ -304,23 +338,23 @@ func (l *Log) Truncate(index uint64, term uint64) error {
}

// Do not truncate past end of entries.
if index > uint64(len(l.entries)) {
if index > l.startIndex + uint64(len(l.entries)) {
return fmt.Errorf("raft.Log: Entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term)
}

// If we're truncating everything then just clear the entries.
if index == 0 {
if index == l.startIndex {
l.entries = []*LogEntry{}
} else {
// Do not truncate if the entry at index does not have the matching term.
entry := l.entries[index-1]
entry := l.entries[index - l.startIndex - 1]
if len(l.entries) > 0 && entry.Term != term {
return fmt.Errorf("raft.Log: Entry at index does not have matching term (%v): (IDX=%v, TERM=%v)", entry.Term, index, term)
}

// Otherwise truncate up to the desired entry.
if index < uint64(len(l.entries)) {
l.entries = l.entries[0:index]
if index < l.startIndex + uint64(len(l.entries)) {
l.entries = l.entries[0:index - l.startIndex]
}
}

Expand Down Expand Up @@ -378,3 +412,61 @@ func (l *Log) appendEntry(entry *LogEntry) error {

return nil
}



//--------------------------------------
// Log compaction
//--------------------------------------

// compaction the log before index
func (l *Log) Compact(index uint64, term uint64) error {
var entries []*LogEntry

l.mutex.Lock()
defer l.mutex.Unlock()

// nothing to compaction
// the index may be greater than the current index if
// we just recovery from on snapshot
if index >= l.internalCurrentIndex() {
entries = make([]*LogEntry, 0)
} else {

// get all log entries after index
entries = l.entries[index - l.startIndex:]
}

// create a new log file and add all the entries
file, err := os.OpenFile(l.path + ".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
for _, entry := range entries {
err = entry.Encode(file)
if err != nil {
return err
}
}
// close the current log file
l.file.Close()

// remove the current log file to .bak
err = os.Remove(l.path)
if err != nil {
return err
}

// rename the new log file
err = os.Rename(l.path + ".new", l.path)
if err != nil {
return err
}
l.file = file

// compaction the in memory log
l.entries = entries
l.startIndex = index
l.startTerm = term
return nil
}
87 changes: 68 additions & 19 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,71 @@ func (p *Peer) stop() {
// Flush
//--------------------------------------

// Sends an AppendEntries RPC but does not obtain a lock on the server. This
// method should only be called from the server.
func (p *Peer) internalFlush() (uint64, bool, error) {
// if internal is set true, sends an AppendEntries RPC but does not obtain a lock
// on the server.
func (p *Peer) flush(internal bool) (uint64, bool, error) {
// Retrieve the peer data within a lock that is separate from the
// server lock when creating the request. Otherwise a deadlock can
// occur.
p.mutex.Lock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()

var req *AppendEntriesRequest
snapShotNeeded := false

// we need to hold the log lock to create AppendEntriesRequest
// avoid snapshot to delete the desired entries before AEQ()
server.log.mutex.Lock()
if prevLogIndex >= server.log.StartIndex() {
if internal {
req = server.createInternalAppendEntriesRequest(prevLogIndex)
} else {
req = server.createAppendEntriesRequest(prevLogIndex)
}
} else {
snapShotNeeded = true
}
server.log.mutex.Unlock()

p.mutex.Lock()
defer p.mutex.Unlock()
req := p.server.createInternalAppendEntriesRequest(p.prevLogIndex)
return p.sendFlushRequest(req)
if snapShotNeeded {
req := server.createSnapshotRequest()
return p.sendSnapshotRequest(req)
} else {
return p.sendFlushRequest(req)
}

}

// send Snapshot Request
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error){
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
}

// Generate an snapshot request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
resp, err := p.server.transporter.SendSnapshotRequest(p.server, p, req)
p.heartbeatTimer.Reset()
if resp == nil {
return 0, false, err
}

// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
p.prevLogIndex = req.LastIndex

} else {
panic(resp)
}

return resp.Term, resp.Success, err
}

// Flushes a request through the server's transport.
Expand All @@ -119,6 +177,7 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
// log. Send the request through the user-provided handler and process the
// result.
resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)

p.heartbeatTimer.Reset()
if resp == nil {
return 0, false, err
Expand Down Expand Up @@ -153,10 +212,11 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
startChannel <- true

for {
// Grab the current timer channel.
p.mutex.Lock()

var c chan time.Time
if p.heartbeatTimer != nil {
c = p.heartbeatTimer.C()
Expand All @@ -171,19 +231,8 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
// Flush the peer when we get a heartbeat timeout. If the channel is
// closed then the peer is getting cleaned up and we should exit.
if _, ok := <-c; ok {
// Retrieve the peer data within a lock that is separate from the
// server lock when creating the request. Otherwise a deadlock can
// occur.
p.mutex.Lock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()

// Lock the server to create a request.
req := server.createAppendEntriesRequest(prevLogIndex)

p.mutex.Lock()
p.sendFlushRequest(req)
p.mutex.Unlock()
p.flush(false)

} else {
break
}
Expand Down
Loading

0 comments on commit 3bcf91a

Please sign in to comment.