Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
add comments and gofmt
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jun 24, 2013
1 parent 2bc3845 commit 00fb080
Show file tree
Hide file tree
Showing 13 changed files with 688 additions and 503 deletions.
21 changes: 9 additions & 12 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
// A log is a collection of log entries that are persisted to durable storage.
type Log struct {
ApplyFunc func(Command) ([]byte, error)
callBackFrom uint64
file *os.File
path string
entries []*LogEntry
Expand Down Expand Up @@ -140,7 +139,7 @@ func (l *Log) CurrentTerm() uint64 {
func (l *Log) Open(path string) error {
l.mutex.Lock()
defer l.mutex.Unlock()

// Read all the entries from the log if one exists.
var lastIndex int = 0
if _, err := os.Stat(path); !os.IsNotExist(err) {
Expand Down Expand Up @@ -168,11 +167,11 @@ func (l *Log) Open(path string) error {
}
break
}

// Append entry.
l.entries = append(l.entries, entry)
l.commitIndex = entry.Index

// Apply the command.
entry.result, err = l.ApplyFunc(entry.Command)

Expand All @@ -183,7 +182,7 @@ func (l *Log) Open(path string) error {

file.Close()
}

// Open the file for appending.
var err error
l.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
Expand Down Expand Up @@ -240,7 +239,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
return l.entries, l.startTerm
}

fmt.Println("[GetEntries] index ", index, "lastIndex", l.entries[len(l.entries) - 1].Index)
fmt.Println("[GetEntries] index ", index, "lastIndex", l.entries[len(l.entries)-1].Index)

// Determine the term at the given entry and return a subslice.
term := l.entries[index-1-l.startIndex].Term
Expand All @@ -264,7 +263,6 @@ func (l *Log) GetEntryError(entry *LogEntry) error {
return nil
}


//--------------------------------------
// Commit
//--------------------------------------
Expand Down Expand Up @@ -299,13 +297,12 @@ func (l *Log) LastInfo() (index uint64, term uint64) {
return l.startIndex, l.startTerm
}

// Return the last index & term
lastEntry := l.entries[len(l.entries) - 1]
// Return the last index & term
lastEntry := l.entries[len(l.entries)-1]
return lastEntry.Index, lastEntry.Term
}


// Updates the commit index
// Updates the commit index
func (l *Log) UpdateCommitIndex(index uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()
Expand Down Expand Up @@ -452,7 +449,7 @@ func (l *Log) Compact(index uint64, term uint64) error {
defer l.mutex.Unlock()

// nothing to compaction
// the index may be greater than the current index if
// the index may be greater than the current index if
// we just recovery from on snapshot
if index >= l.internalCurrentIndex() {
entries = make([]*LogEntry, 0)
Expand Down
10 changes: 5 additions & 5 deletions log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
// A log entry stores a single item in the log.
type LogEntry struct {
log *Log
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Command Command `json:"command"`
result []byte `json:"-"`
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Command Command `json:"command"`
result []byte `json:"-"`
commit chan bool `json:"-"`
}

Expand All @@ -47,7 +47,7 @@ func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry
Index: index,
Term: term,
Command: command,
commit: make(chan bool, 3),
commit: make(chan bool, 3),
}
}

Expand Down
37 changes: 37 additions & 0 deletions log_entry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package raft

import (
"encoding/json"
//"reflect"
"testing"
)

//------------------------------------------------------------------------------
//
// Tests
//
//------------------------------------------------------------------------------

//--------------------------------------
// Encoding
//--------------------------------------

// Ensure that we can encode a log entry to JSON.
func TestLogEntryMarshal(t *testing.T) {
e := NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
if b, err := json.Marshal(e); !(string(b) == `{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}` && err == nil) {
t.Fatalf("Unexpected log entry marshalling: %v (%v)", string(b), err)
}
}

// // Ensure that we can decode a log entry from JSON.
// func TestLogEntryUnmarshal(t *testing.T) {
// e := &LogEntry{}
// b := []byte(`{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}`)
// if err := json.Unmarshal(b, e); err != nil {
// t.Fatalf("Log entry unmarshalling error: %v", err)
// }
// if !reflect.DeepEqual(e, NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})) {
// t.Fatalf("Log entry unmarshaled incorrectly: %v", e)
// }
// }
225 changes: 225 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package raft

import (
"io/ioutil"
"os"
"reflect"
"testing"
)

//------------------------------------------------------------------------------
//
// Tests
//
//------------------------------------------------------------------------------

//--------------------------------------
// Append
//--------------------------------------

// Ensure that we can append to a new log.
func TestLogNewLog(t *testing.T) {
path := getLogPath()
log := NewLog()
log.ApplyFunc = func(c Command) ([]byte, error) {
return nil, nil
}
if err := log.Open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.Close()
defer os.Remove(path)

if err := log.AppendEntry(NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})); err != nil {
t.Fatalf("Unable to append: %v", err)
}
if err := log.AppendEntry(NewLogEntry(log, 2, 1, &TestCommand2{100})); err != nil {
t.Fatalf("Unable to append: %v", err)
}
if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})); err != nil {
t.Fatalf("Unable to append: %v", err)
}

// Partial commit.
if err := log.SetCommitIndex(2); err != nil {
t.Fatalf("Unable to partially commit: %v", err)
}
expected := `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n"
actual, _ := ioutil.ReadFile(path)
if string(actual) != expected {
t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
}
if index, term := log.CommitInfo(); index != 2 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}

// Full commit.
if err := log.SetCommitIndex(3); err != nil {
t.Fatalf("Unable to commit: %v", err)
}
expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n"
actual, _ = ioutil.ReadFile(path)
if string(actual) != expected {
t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
}
if index, term := log.CommitInfo(); index != 3 || term != 2 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}

// Ensure that we can decode and encode to an existing log.
// func TestLogExistingLog(t *testing.T) {
// log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +
// `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n")
// defer log.Close()
// defer os.Remove(path)

// // Validate existing log entries.
// if len(log.entries) != 3 {
// t.Fatalf("Expected 3 entries, got %d", len(log.entries))
// }
// if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) {
// t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
// }
// if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) {
// t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
// }
// if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})) {
// t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
// }
// }

// Ensure that we can check the contents of the log by index/term.
func TestLogContainsEntries(t *testing.T) {
log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n")
defer log.Close()
defer os.Remove(path)

if log.ContainsEntry(0, 0) {
t.Fatalf("Zero-index entry should not exist in log.")
}
if log.ContainsEntry(1, 0) {
t.Fatalf("Entry with mismatched term should not exist")
}
if log.ContainsEntry(4, 0) {
t.Fatalf("Out-of-range entry should not exist")
}
if !log.ContainsEntry(2, 1) {
t.Fatalf("Entry 2/1 should exist")
}
if !log.ContainsEntry(3, 2) {
t.Fatalf("Entry 2/1 should exist")
}
}

// Ensure that we can recover from an incomplete/corrupt log and continue logging.
// func TestLogRecovery(t *testing.T) {
// path := setupLogFile(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +
// `6ac5807c 0000000000000003 00000000000`)
// log := NewLog()
// log.ApplyFunc = func(c Command) ([]byte, error) {
// return nil,nil
// }
// if err := log.Open(path); err != nil {
// t.Fatalf("Unable to open log: %v", err)
// }
// defer log.Close()
// defer os.Remove(path)

// if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})); err != nil {
// t.Fatalf("Unable to append: %v", err)
// }

// // Validate existing log entries.
// if len(log.entries) != 3 {
// t.Fatalf("Expected 2 entries, got %d", len(log.entries))
// }
// if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) {
// t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
// }
// if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) {
// t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
// }
// if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})) {
// t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
// }

// // Validate precommit log contents.
// expected := `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n"
// actual, _ := ioutil.ReadFile(path)
// if string(actual) != expected {
// t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
// }

// // Validate committed log contents.
// if err := log.SetCommitIndex(3); err != nil {
// t.Fatalf("Unable to partially commit: %v", err)
// }
// expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
// `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +
// `3f3f884c 0000000000000003 0000000000000002 cmd_1 {"val":"bat","i":-5}` + "\n"
// actual, _ = ioutil.ReadFile(path)
// if string(actual) != expected {
// t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
// }
// }

//--------------------------------------
// Append
//--------------------------------------

// Ensure that we can truncate uncommitted entries in the log.
func TestLogTruncate(t *testing.T) {
log, path := setupLog("")
if err := log.Open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.Close()
defer os.Remove(path)

entry1 := NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})
if err := log.AppendEntry(entry1); err != nil {
t.Fatalf("Unable to append: %v", err)
}
entry2 := NewLogEntry(log, 2, 1, &TestCommand2{100})
if err := log.AppendEntry(entry2); err != nil {
t.Fatalf("Unable to append: %v", err)
}
entry3 := NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})
if err := log.AppendEntry(entry3); err != nil {
t.Fatalf("Unable to append: %v", err)
}
if err := log.SetCommitIndex(2); err != nil {
t.Fatalf("Unable to partially commit: %v", err)
}

// Truncate committed entry.
if err := log.Truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" {
t.Fatalf("Truncating committed entries shouldn't work: %v", err)
}
// Truncate past end of log.
if err := log.Truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" {
t.Fatalf("Truncating past end-of-log shouldn't work: %v", err)
}
// Truncate entry with mismatched term.
if err := log.Truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" {
t.Fatalf("Truncating mismatched entries shouldn't work: %v", err)
}
// Truncate end of log.
if err := log.Truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2, entry3})) {
t.Fatalf("Truncating end of log should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2, entry3})
}
// Truncate at last commit.
if err := log.Truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2})) {
t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2})
}

}
Loading

0 comments on commit 00fb080

Please sign in to comment.