diff --git a/server.go b/server.go index 552760c..7e21636 100644 --- a/server.go +++ b/server.go @@ -62,7 +62,7 @@ type Server struct { heartbeatTimeout time.Duration currentSnapshot *Snapshot lastSnapshot *Snapshot - machineState int //TODO CHANGE THIS TO INTERFACE: recovery and store + stateMachine StateMachine } //------------------------------------------------------------------------------ @@ -812,7 +812,13 @@ func (s *Server) takeSnapshot() error { path := s.SnapshotPath(lastIndex, lastTerm) - s.currentSnapshot = &Snapshot{lastIndex, lastTerm, s.machineState , path} + state, err := s.stateMachine.Save() + + if err !=nil { + return err + } + + s.currentSnapshot = &Snapshot{lastIndex, lastTerm, state, path} s.saveSnapshot() @@ -857,13 +863,13 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro defer s.mutex.Unlock() //recovery machine state - s.machineState = req.MachineState + s.stateMachine.Recovery(req.State) //update term and index s.currentTerm = req.LastTerm s.log.UpdateCommitIndex(req.LastIndex) snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm) - s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.MachineState, snapshotPath} + s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.State, snapshotPath} s.saveSnapshot() s.log.Compact(req.LastIndex, req.LastTerm) @@ -913,15 +919,21 @@ func (s *Server) LoadSnapshot() error { &lastIndex, &lastTerm) if err != nil { - panic(err) + return err } if n != 4 { - panic(n) + return errors.New("Bad snapshot file") } - s.lastSnapshot = &Snapshot{lastIndex, lastTerm, 1, snapshotPath} - s.machineState = machineState + state, err := s.stateMachine.Save() + if err != nil { + return err + } + + s.lastSnapshot = &Snapshot{lastIndex, lastTerm, state, snapshotPath} + err = s.stateMachine.Recovery(state) + s.log.SetStartTerm(lastTerm) s.log.SetStartIndex(lastIndex) s.log.UpdateCommitIndex(lastIndex) diff --git a/snapshot.go b/snapshot.go index 2420cdb..36d82a4 100644 --- a/snapshot.go +++ b/snapshot.go @@ -20,7 +20,7 @@ type Snapshot struct { lastIndex uint64 lastTerm uint64 // cluster configuration. - machineState int + state []byte path string } @@ -46,9 +46,13 @@ func (ss *Snapshot) Save() error { defer file.Close() - // Write log entry with checksum. - if _, err = fmt.Fprintf(file, "%08x\n%s\n%v\n%v\n", checksum, b.String(), - ss.lastIndex, ss.lastTerm); err != nil { + // Write snapshot with checksum. + if _, err = fmt.Fprintf(file, "%08x\n%v\n%v\n", checksum, ss.lastIndex, + ss.lastTerm); err != nil { + return err + } + + if _, err = file.Write(ss.state); err != nil { return err } diff --git a/snapshot_request.go b/snapshot_request.go index 726615c..657e6d3 100644 --- a/snapshot_request.go +++ b/snapshot_request.go @@ -5,7 +5,7 @@ type SnapshotRequest struct { LeaderName string `json:"leaderName"` LastIndex uint64 `json:"lastTerm"` LastTerm uint64 `json:"lastIndex"` - MachineState int `json:"machineState"` + State []byte `json:"state"` } // The response returned from a server appending entries to the log. @@ -27,7 +27,7 @@ func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest LeaderName: leaderName, LastIndex: snapshot.lastIndex, LastTerm: snapshot.lastTerm, - MachineState: snapshot.machineState, + State: snapshot.state, } } diff --git a/statemachine.go b/statemachine.go new file mode 100644 index 0000000..8447e23 --- /dev/null +++ b/statemachine.go @@ -0,0 +1,15 @@ +package raft + +//------------------------------------------------------------------------------ +// +// Typedefs +// +//------------------------------------------------------------------------------ + +// StateMachine is the interface for allowing the host application to save and +// recovery the state machine +type StateMachine interface { + Save() ([]byte, error) + Recovery([]byte) error + +} \ No newline at end of file