Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
Snapshot test coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Jan 21, 2014
1 parent 7b9d282 commit 1d66f6a
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 140 deletions.
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
all: test
COVERPROFILE=/tmp/c.out

This comment has been minimized.

Copy link
@philips

philips Jan 21, 2014

Member

can this please go into the current working directory?

This comment has been minimized.

Copy link
@benbjohnson

benbjohnson Jan 21, 2014

Author Contributor

Fixed.


coverage:
gocov test github.com/goraft/raft | gocov-html > coverage.html
open coverage.html
default: test

cover:
go test -coverprofile=$(COVERPROFILE) .
go tool cover -html=$(COVERPROFILE)
rm $(COVERPROFILE)

dependencies:
go get -d .
Expand Down
1 change: 0 additions & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func RegisterCommand(command Command) {
panic(fmt.Sprintf("raft: Cannot register nil"))
} else if commandTypes[command.CommandName()] != nil {
panic(fmt.Sprintf("raft: Duplicate registration: %s", command.CommandName()))
return
}
commandTypes[command.CommandName()] = command
}
121 changes: 51 additions & 70 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,55 +1102,46 @@ func (s *server) RemovePeer(name string) error {
//--------------------------------------

func (s *server) TakeSnapshot() error {
//TODO put a snapshot mutex
// TODO: put a snapshot mutex

This comment has been minimized.

Copy link
@philips

philips Jan 21, 2014

Member

Can we comment on what the snapshot will protect? Two snapshots from happening at the same time or something else?

s.debugln("take Snapshot")

// Exit if the server is currently creating a snapshot.

This comment has been minimized.

Copy link
@philips

philips Jan 21, 2014

Member

This is what I would think the mutex would protect...

if s.currentSnapshot != nil {
return errors.New("handling snapshot")
}

// Exit if there are no logs yet in the system.
lastIndex, lastTerm := s.log.commitInfo()

path := s.SnapshotPath(lastIndex, lastTerm)
if lastIndex == 0 {
return errors.New("No logs")
}

path := s.SnapshotPath(lastIndex, lastTerm)

var state []byte
var err error

if s.stateMachine != nil {
state, err = s.stateMachine.Save()

if err != nil {
return err
}

} else {
state = []byte{0}
}

peers := make([]*Peer, len(s.peers)+1)

i := 0
// Clone the list of peers.
peers := make([]*Peer, 0, len(s.peers)+1)
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}

peers[i] = &Peer{
Name: s.Name(),
ConnectionString: s.connectionString,
peers = append(peers, peer.clone())
}
peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})

// Attach current snapshot and save it to disk.
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}

s.saveSnapshot()

// We keep some log entries after the snapshot
// We do not want to send the whole snapshot
// to the slightly slow machines
if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
// We keep some log entries after the snapshot.
// We do not want to send the whole snapshot to the slightly slow machines
if lastIndex - s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
compactTerm := s.log.getEntry(compactIndex).Term
s.log.compact(compactIndex, compactTerm)
Expand All @@ -1161,25 +1152,25 @@ func (s *server) TakeSnapshot() error {

// Retrieves the log path for the server.
func (s *server) saveSnapshot() error {

if s.currentSnapshot == nil {
return errors.New("no snapshot to save")
}

err := s.currentSnapshot.save()

if err != nil {
// Write snapshot to disk.
if err := s.currentSnapshot.save(); err != nil {
return err
}

// Swap the current and last snapshots.
tmp := s.lastSnapshot
s.lastSnapshot = s.currentSnapshot

// delete the previous snapshot if there is any change
// Delete the previous snapshot if there is any change
if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
tmp.remove()
}
s.currentSnapshot = nil

This comment has been minimized.

Copy link
@philips

philips Jan 21, 2014

Member

It is a bit confusing that we have a field called "currentSnapshot" that is null when TakeSnapshot complete successfully.


return nil
}

Expand All @@ -1195,18 +1186,15 @@ func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
}

func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {

// If the follower’s log contains an entry at the snapshot’s last index with a term
// that matches the snapshot’s last term
// Then the follower already has all the information found in the snapshot
// and can reply false

// that matches the snapshot’s last term, then the follower already has all the
// information found in the snapshot and can reply false.
entry := s.log.getEntry(req.LastIndex)

if entry != nil && entry.Term == req.LastTerm {
return newSnapshotResponse(false)
}

// Update state.
s.setState(Snapshotting)

return newSnapshotResponse(true)
Expand All @@ -1219,29 +1207,26 @@ func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *Snapshot
}

func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
// Recover state sent from request.
if err := s.stateMachine.Recovery(req.State); err != nil {
return newSnapshotRecoveryResponse(req.LastTerm, false, req.LastIndex)
}

s.stateMachine.Recovery(req.State)

// clear the peer map
// Recover the cluster configuration.
s.peers = make(map[string]*Peer)

// recovery the cluster configuration
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}

//update term and index
// Update log state.
s.currentTerm = req.LastTerm

s.log.updateCommitIndex(req.LastIndex)

snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)

s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}

// Create local snapshot.
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
s.saveSnapshot()

// clear the previous log entries
// Clear the previous log entries.
s.log.compact(req.LastIndex, req.LastTerm)

return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
Expand All @@ -1250,79 +1235,75 @@ func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S

// Load a snapshot at restart
func (s *server) LoadSnapshot() error {
// Open snapshot/ directory.
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
if err != nil {

return err
}

// Retrieve a list of all snapshots.
filenames, err := dir.Readdirnames(-1)

if err != nil {
dir.Close()
panic(err)
}

dir.Close()

if len(filenames) == 0 {
return errors.New("no snapshot")
}

// not sure how many snapshot we should keep
// Grab the latest snapshot.
sort.Strings(filenames)
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])

// should not fail
// Read snapshot data.
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
defer file.Close()
if err != nil {
panic(err)
return err
}
defer file.Close()

// TODO check checksum first

var snapshotBytes []byte
// Check checksum.
var checksum uint32

n, err := fmt.Fscanf(file, "%08x\n", &checksum)

if err != nil {
return err
}

if n != 1 {
} else if n != 1 {
return errors.New("Bad snapshot file")
}

snapshotBytes, _ = ioutil.ReadAll(file)
s.debugln(string(snapshotBytes))
// Load remaining snapshot contents.
b, err := ioutil.ReadAll(file)
if err != nil {
return err
}

// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(snapshotBytes)

byteChecksum := crc32.ChecksumIEEE(b)
if uint32(checksum) != byteChecksum {
s.debugln(checksum, " ", byteChecksum)
return errors.New("bad snapshot file")
}

err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)

if err != nil {
// Decode snapshot.
if err = json.Unmarshal(b, &s.lastSnapshot); err != nil {
s.debugln("unmarshal error: ", err)
return err
}

err = s.stateMachine.Recovery(s.lastSnapshot.State)

if err != nil {
// Recover snapshot into state machine.
if err = s.stateMachine.Recovery(s.lastSnapshot.State); err != nil {
s.debugln("recovery error: ", err)
return err
}

// Recover cluster configuration.
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}

// Update log state.
s.log.startTerm = s.lastSnapshot.LastTerm
s.log.startIndex = s.lastSnapshot.LastIndex
s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
Expand Down
6 changes: 0 additions & 6 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"time"
)

//------------------------------------------------------------------------------
//
// Tests
//
//------------------------------------------------------------------------------

//--------------------------------------
// Request Vote
//--------------------------------------
Expand Down
47 changes: 22 additions & 25 deletions snapshot.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,61 @@
package raft

import (
//"bytes"
"encoding/json"
"fmt"
"hash/crc32"
"os"
)

//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------

// the in memory SnapShot struct
// TODO add cluster configuration
// Snapshot represents an in-memory representation of the current state of the system.
type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// cluster configuration.

// Cluster configuration.
Peers []*Peer `json:"peers"`
State []byte `json:"state"`
Path string `json:"path"`
}

// Save the snapshot to a file
// save writes the snapshot to file.
func (ss *Snapshot) save() error {
// Write machine state to temporary buffer.

// open file
// Open the file for writing.
file, err := os.OpenFile(ss.Path, os.O_CREATE|os.O_WRONLY, 0600)

if err != nil {
return err
}

defer file.Close()

// Serialize to JSON.
b, err := json.Marshal(ss)
if err != nil {
return err
}

// Generate checksum.
// Generate checksum and write it to disk.
checksum := crc32.ChecksumIEEE(b)

// Write snapshot with checksum.
if _, err = fmt.Fprintf(file, "%08x\n", checksum); err != nil {
return err
}

// Write the snapshot to disk.
if _, err = file.Write(b); err != nil {
return err
}

// force the change writting to disk
file.Sync()
return err
// Ensure that the snapshot has been flushed to disk before continuing.
if err := file.Sync(); err != nil {
return err
}

return nil
}

// remove the file of the snapshot
// remove deletes the snapshot file.
func (ss *Snapshot) remove() error {
err := os.Remove(ss.Path)
return err
if err := os.Remove(ss.Path); err != nil {
return err
}
return nil
}
6 changes: 0 additions & 6 deletions snapshot_recovery_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ type SnapshotRecoveryRequest struct {
State []byte
}

//------------------------------------------------------------------------------
//
// Constructors
//
//------------------------------------------------------------------------------

// Creates a new Snapshot request.
func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequest {
return &SnapshotRecoveryRequest{
Expand Down
Loading

0 comments on commit 1d66f6a

Please sign in to comment.