From 77c63cc1f187238d4a5e2d3db09cd67f0fbab431 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 5 Jun 2013 13:57:31 -0400 Subject: [PATCH] Add error lookup for committed log entries. --- log.go | 47 +++++++++++++++++++++++++++++------------------ server.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/log.go b/log.go index c0fa778..90969fa 100644 --- a/log.go +++ b/log.go @@ -20,6 +20,7 @@ type Log struct { ApplyFunc func(Command) error file *os.File entries []*LogEntry + errors []error commitIndex uint64 mutex sync.Mutex } @@ -144,16 +145,14 @@ func (l *Log) Open(path string) error { break } - // Apply the command. - if err = l.ApplyFunc(entry.Command); err != nil { - file.Close() - return err - } - // Append entry. l.entries = append(l.entries, entry) - l.commitIndex = entry.Index + + // Apply the command. + err = l.ApplyFunc(entry.Command) + l.errors = append(l.errors, err) + lastIndex += n } @@ -180,6 +179,7 @@ func (l *Log) Close() { l.file = nil } l.entries = make([]*LogEntry, 0) + l.errors = make([]error, 0) } //-------------------------------------- @@ -223,6 +223,22 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { return l.entries[index:], term } +// Retrieves the error returned from an entry. The error can only exist after +// the entry has been committed. +func (l *Log) GetEntryError(entry *LogEntry) error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if entry == nil { + panic("raft: Log entry required for error retrieval") + } + + if entry.Index > 0 && entry.Index <= uint64(len(l.errors)) { + return l.errors[entry.Index-1] + } + return nil +} + //-------------------------------------- // Commit //-------------------------------------- @@ -247,11 +263,6 @@ func (l *Log) SetCommitIndex(index uint64) error { l.mutex.Lock() defer l.mutex.Unlock() - // Panic if we don't have any way to apply commands. - if l.ApplyFunc == nil { - panic("raft.Log: Apply function not set") - } - // Do not allow previous indices to be committed again. if index < l.commitIndex { return fmt.Errorf("raft.Log: Commit index (%d) ahead of requested commit index (%d)", l.commitIndex, index) @@ -262,12 +273,8 @@ func (l *Log) SetCommitIndex(index uint64) error { // Find all entries whose index is between the previous index and the current index. for i := l.commitIndex + 1; i <= index; i++ { - entry := l.entries[i-1] - - // Apply the changes to the state machine. - if err := l.ApplyFunc(entry.Command); err != nil { - return err - } + entryIndex := i-1 + entry := l.entries[entryIndex] // Write to storage. if err := entry.Encode(l.file); err != nil { @@ -276,6 +283,9 @@ func (l *Log) SetCommitIndex(index uint64) error { // Update commit index. l.commitIndex = entry.Index + + // Apply the changes to the state machine and store the error code. + l.errors[entryIndex] = l.ApplyFunc(entry.Command) } return nil @@ -367,6 +377,7 @@ func (l *Log) appendEntry(entry *LogEntry) error { // Append to entries list if stored on disk. l.entries = append(l.entries, entry) + l.errors = append(l.errors, nil) return nil } diff --git a/server.go b/server.go index 3f2b100..8994d1c 100644 --- a/server.go +++ b/server.go @@ -415,10 +415,15 @@ loop: // Commit to log and flush to peers again. if committed { - return s.log.SetCommitIndex(entry.Index) + if err := s.log.SetCommitIndex(entry.Index); err != nil { + return err + } + return s.log.GetEntryError(entry) } - return nil + // TODO: This will be removed after the timeout above is changed to a + // demotion callback. + return fmt.Errorf("raft: Unable to commit entry: %d", entry.Index) } // Appends a log entry from the leader to this server. @@ -724,3 +729,23 @@ func (s *Server) AddPeer(name string) error { return nil } + +// Removes a peer from the server. This should be called by a system's join command +// within the context so that it is within the context of the server lock. +func (s *Server) RemovePeer(name string) error { + // Ignore removal of the server itself. + if s.name == name { + return nil + } + // Return error if peer doesn't exist. + peer := s.peers[name] + if peer != nil { + return fmt.Errorf("raft: Peer not found: %s", name) + } + + // Stop peer and remove it. + peer.stop() + delete(s.peers, name) + + return nil +}