Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
fix issues (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jun 5, 2013
1 parent c0e1613 commit 842aa1a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 63 deletions.
15 changes: 11 additions & 4 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type Log struct {
ApplyFunc func(Command) error
file *os.File
path string
path string
entries []*LogEntry
commitIndex uint64
mutex sync.Mutex
Expand Down Expand Up @@ -412,7 +412,7 @@ func (l *Log) appendEntry(entry *LogEntry) error {
//--------------------------------------

// compaction the log before index
func (l *Log) Compaction(index uint64, term uint64) error {
func (l *Log) Compact(index uint64, term uint64) error {
var entries []*LogEntry

l.mutex.Lock()
Expand Down Expand Up @@ -444,9 +444,16 @@ func (l *Log) Compaction(index uint64, term uint64) error {
l.file.Close()

// remove the current log file to .bak
os.Remove(l.path)
err = os.Remove(l.path)
if err != nil {
return err
}

// rename the new log file
os.Rename(l.path + ".new", l.path)
err = os.Rename(l.path + ".new", l.path)
if err != nil {
return err
}
l.file = file

// compaction the in memory log
Expand Down
31 changes: 17 additions & 14 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,27 +213,30 @@ func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
p.mutex.Lock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()


var req *AppendEntriesRequest
snapShotNeeded := false

// we need to hold the log lock to create AppendEntriesRequest
// avoid snapshot to delete the desired entries before AEQ()
server.log.mutex.Lock()
if prevLogIndex < server.log.StartIndex() {
if prevLogIndex >= server.log.StartIndex() {
req = server.createAppendEntriesRequest(prevLogIndex)
} else {
snapShotNeeded = true
}
server.log.mutex.Unlock()

// request the log before the latest snapshot
// send out the snapshot
server.log.mutex.Unlock()
p.mutex.Lock()
if snapShotNeeded {
req := server.createSnapshotRequest()

p.mutex.Lock()
p.sendSnapshotRequest(req)
p.mutex.Unlock()
} else {

// Lock the server to create a request.
req := server.createAppendEntriesRequest(prevLogIndex)
server.log.mutex.Unlock()
p.mutex.Lock()
p.sendFlushRequest(req)
p.mutex.Unlock()
}
p.mutex.Unlock()


} else {
break
}
Expand Down
11 changes: 6 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"sort"
"bufio"
"path"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -791,7 +792,7 @@ func (s *Server) takeSnapshot() error {

s.saveSnapshot()

s.log.Compaction(lastIndex, lastTerm)
s.log.Compact(lastIndex, lastTerm)

return nil
}
Expand Down Expand Up @@ -822,7 +823,7 @@ func (s *Server) saveSnapshot() error {

// Retrieves the log path for the server.
func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
return fmt.Sprintf("%s/snapshot/%v_%v.ss", s.path, lastTerm, lastIndex)
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
}


Expand All @@ -840,7 +841,7 @@ func (s *Server) SnapshotRecovery(index uint64, term uint64, machineState int) (
snapshotPath := s.SnapshotPath(index, term)
s.currentSnapshot = &Snapshot{index, term, machineState, snapshotPath}
s.saveSnapshot()
s.log.Compaction(index, term)
s.log.Compact(index, term)


return NewSnapshotResponse(term, true, index), nil
Expand All @@ -849,7 +850,7 @@ func (s *Server) SnapshotRecovery(index uint64, term uint64, machineState int) (

// Load a snapshot at restart
func (s *Server) LoadSnapshot() error {
dir, err := os.OpenFile(s.path + "/snapshot", os.O_RDONLY, 0)
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
if err != nil {
dir.Close()
panic(err)
Expand All @@ -869,7 +870,7 @@ func (s *Server) LoadSnapshot() error {

// not sure how many snapshot we should keep
sort.Strings(filenames)
snapshotPath := s.path + "/snapshot/" + filenames[len(filenames) - 1]
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames) - 1])

// should not file
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
Expand Down
40 changes: 0 additions & 40 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,6 @@ type Snapshot struct {
path string
}

// The request sent to a server to start from the snapshot.
type SnapshotRequest struct {
LeaderName string `json:"leaderName"`
LastIndex uint64 `json:"lastTerm"`
LastTerm uint64 `json:"lastIndex"`
MachineState int `json:"machineState"`
}

// The response returned from a server appending entries to the log.
type SnapshotResponse struct {
Term uint64 `json:"term"`
Success bool `json:"success"`
CommitIndex uint64 `json:"commitIndex"`
}

//------------------------------------------------------------------------------
//
// Constructors
//
//------------------------------------------------------------------------------

// Creates a new Snapshot request.
func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest {
return &SnapshotRequest{
LeaderName: leaderName,
LastIndex: snapshot.lastIndex,
LastTerm: snapshot.lastTerm,
MachineState: snapshot.machineState,
}
}

// Creates a new Snapshot response.
func NewSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse {
return &SnapshotResponse{
Term: term,
Success: success,
CommitIndex: commitIndex,
}
}

// Save the snapshot to a file
func (ss *Snapshot) Save() error {
// Write machine state to temporary buffer.
Expand Down

0 comments on commit 842aa1a

Please sign in to comment.