Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
solve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jun 6, 2013
2 parents f15cade + 77c63cc commit 349a1ab
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
47 changes: 29 additions & 18 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Log struct {
file *os.File
path string
entries []*LogEntry
errors []error
commitIndex uint64
mutex sync.Mutex
startIndex uint64 // the index before the first entry in the Log entries
Expand Down Expand Up @@ -166,16 +167,14 @@ func (l *Log) Open(path string) error {
break
}

// Apply the command.
if err = l.ApplyFunc(entry.Command); err != nil {
file.Close()
return err
}

// Append entry.
l.entries = append(l.entries, entry)

l.commitIndex = entry.Index

// Apply the command.
err = l.ApplyFunc(entry.Command)
l.errors = append(l.errors, err)

lastIndex += n
}

Expand All @@ -202,6 +201,7 @@ func (l *Log) Close() {
l.file = nil
}
l.entries = make([]*LogEntry, 0)
l.errors = make([]error, 0)
}

//--------------------------------------
Expand Down Expand Up @@ -244,6 +244,22 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
return l.entries[index - l.startIndex:], term
}

// Retrieves the error returned from an entry. The error can only exist after
// the entry has been committed.
func (l *Log) GetEntryError(entry *LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()

if entry == nil {
panic("raft: Log entry required for error retrieval")
}

if entry.Index > 0 && entry.Index <= uint64(len(l.errors)) {
return l.errors[entry.Index-1]
}
return nil
}

//--------------------------------------
// Commit
//--------------------------------------
Expand Down Expand Up @@ -281,11 +297,6 @@ func (l *Log) SetCommitIndex(index uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()

// Panic if we don't have any way to apply commands.
if l.ApplyFunc == nil {
panic("raft.Log: Apply function not set")
}

// Do not allow previous indices to be committed again.
if index < l.commitIndex {
return fmt.Errorf("raft.Log: Commit index (%d) ahead of requested commit index (%d)", l.commitIndex, index)
Expand All @@ -296,12 +307,8 @@ func (l *Log) SetCommitIndex(index uint64) error {

// Find all entries whose index is between the previous index and the current index.
for i := l.commitIndex + 1; i <= index; i++ {
entry := l.entries[i - 1 - l.startIndex]

// Apply the changes to the state machine.
if err := l.ApplyFunc(entry.Command); err != nil {
return err
}
entryIndex := i - 1 - l.startIndex
entry := l.entries[entryIndex]

// Write to storage.
if err := entry.Encode(l.file); err != nil {
Expand All @@ -310,6 +317,9 @@ func (l *Log) SetCommitIndex(index uint64) error {

// Update commit index.
l.commitIndex = entry.Index

// Apply the changes to the state machine and store the error code.
l.errors[entryIndex] = l.ApplyFunc(entry.Command)
}

return nil
Expand Down Expand Up @@ -401,6 +411,7 @@ func (l *Log) appendEntry(entry *LogEntry) error {

// Append to entries list if stored on disk.
l.entries = append(l.entries, entry)
l.errors = append(l.errors, nil)

return nil
}
Expand Down
30 changes: 28 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,15 @@ loop:

// Commit to log and flush to peers again.
if committed {
return s.log.SetCommitIndex(entry.Index)
if err := s.log.SetCommitIndex(entry.Index); err != nil {
return err
}
return s.log.GetEntryError(entry)
}
return nil

// TODO: This will be removed after the timeout above is changed to a
// demotion callback.
return fmt.Errorf("raft: Unable to commit entry: %d", entry.Index)
}

// Appends a log entry from the leader to this server.
Expand Down Expand Up @@ -750,6 +756,26 @@ func (s *Server) AddPeer(name string) error {
return nil
}

// Removes a peer from the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
func (s *Server) RemovePeer(name string) error {
// Ignore removal of the server itself.
if s.name == name {
return nil
}
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer != nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}

// Stop peer and remove it.
peer.stop()
delete(s.peers, name)

return nil
}


//--------------------------------------
// Log compaction
Expand Down

0 comments on commit 349a1ab

Please sign in to comment.