diff --git a/server.go b/server.go index 7e21636..46faeba 100644 --- a/server.go +++ b/server.go @@ -7,8 +7,8 @@ import ( "time" "os" "sort" - "bufio" "path" + "io/ioutil" ) //------------------------------------------------------------------------------ @@ -862,7 +862,6 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro s.mutex.Lock() defer s.mutex.Unlock() - //recovery machine state s.stateMachine.Recovery(req.State) //update term and index @@ -912,21 +911,21 @@ func (s *Server) LoadSnapshot() error { // TODO check checksum first // TODO recovery state machine - var machineState int + var state []byte var checksum, lastIndex, lastTerm uint64 - reader := bufio.NewReader(file) - n , err := fmt.Fscanf(reader, "%08x\n%v\n%v\n%v", &checksum, &machineState, - &lastIndex, &lastTerm) + + n , err := fmt.Fscanf(file, "%08x\n%v\n%v", &checksum, &lastIndex, &lastTerm) if err != nil { return err } - if n != 4 { + if n != 3 { return errors.New("Bad snapshot file") } - state, err := s.stateMachine.Save() + state, _ = ioutil.ReadAll(file) + if err != nil { return err } diff --git a/snapshot.go b/snapshot.go index 36d82a4..059cd2c 100644 --- a/snapshot.go +++ b/snapshot.go @@ -52,7 +52,7 @@ func (ss *Snapshot) Save() error { return err } - if _, err = file.Write(ss.state); err != nil { + if _, err = file.Write(ss.state); err != nil { return err } diff --git a/snapshot_test.go b/snapshot_test.go index a4f95e7..52b463b 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -4,7 +4,7 @@ import ( "sync" "testing" "time" - //"fmt" + "bytes" ) // test take and send snapshot @@ -41,9 +41,20 @@ func TestTakeAndSendSnapshot(t *testing.T) { return resp, err } + stateMachine := &testStateMachine{} + + stateMachine.saveFunc = func() ([]byte,error) { + return []byte{0x8},nil + } + + stateMachine.recoveryFunc = func(state []byte) error { + return nil + } + var leader *Server for _, name := range names { server := newTestServer(name, transporter) + server.stateMachine = stateMachine server.SetElectionTimeout(testElectionTimeout) server.SetHeartbeatTimeout(testHeartbeatTimeout) if err := server.Start(); err != nil { @@ -102,6 +113,7 @@ func TestTakeAndSendSnapshot(t *testing.T) { // test send snapshot to a new node // send from heartbeat newServer := newTestServer("4", transporter) + newServer.stateMachine = stateMachine if err := newServer.Start(); err != nil { t.Fatalf("Unable to start server[4]: %v", err) } @@ -128,6 +140,20 @@ func TestTakeAndSendSnapshot(t *testing.T) { func TestStartFormSnapshot(t *testing.T) { server := newTestServer("1", &testTransporter{}) + + stateMachine := &testStateMachine{} + stateMachine.saveFunc = func() ([]byte,error) { + return []byte{0x60,0x61,0x62,0x63,0x64,0x65},nil + } + + stateMachine.recoveryFunc = func(state []byte) error { + expect := []byte{0x60,0x61,0x62,0x63,0x64,0x65} + if !(bytes.Equal(state, expect)) { + t.Fatalf("Invalid State [Expcet=%v, Actual=%v]", expect, state) + } + return nil + } + server.stateMachine = stateMachine oldPath := server.path server.Start() @@ -156,6 +182,7 @@ func TestStartFormSnapshot(t *testing.T) { server.Stop() server = newTestServer("1", &testTransporter{}) + server.stateMachine = stateMachine // reset the oldPath server.path = oldPath diff --git a/statemachine.go b/statemachine.go index 8447e23..555e162 100644 --- a/statemachine.go +++ b/statemachine.go @@ -11,5 +11,4 @@ package raft type StateMachine interface { Save() ([]byte, error) Recovery([]byte) error - } \ No newline at end of file diff --git a/test.go b/test.go index c862568..d5a87c1 100644 --- a/test.go +++ b/test.go @@ -113,6 +113,21 @@ func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *S return t.sendSnapshotRequestFunc(server, peer, req) } + +type testStateMachine struct { + saveFunc func() ([]byte, error) + recoveryFunc func([]byte) error +} + +func (sm *testStateMachine) Save() ([]byte, error) { + return sm.saveFunc() +} + +func (sm *testStateMachine) Recovery(state []byte) error { + return sm.recoveryFunc(state) +} + + //-------------------------------------- // Join Command //--------------------------------------