diff --git a/apps/nsq_data_tool/tool.go b/apps/nsq_data_tool/tool.go index 31bc0e4d..91e9304b 100644 --- a/apps/nsq_data_tool/tool.go +++ b/apps/nsq_data_tool/tool.go @@ -106,7 +106,7 @@ func main() { nsqd.NsqLogger().Infof("peeked channel msg : %v", m) } - cnt, err = delayQ.PeekAll(rets) + cnt, _ = delayQ.PeekAll(rets) for _, m := range rets[:cnt] { nsqd.NsqLogger().Infof("peeked msg : %v", m) } diff --git a/consistence/data_placement_mgr.go b/consistence/data_placement_mgr.go index 9088d4e3..4901b215 100644 --- a/consistence/data_placement_mgr.go +++ b/consistence/data_placement_mgr.go @@ -4,13 +4,14 @@ import ( "container/heap" "errors" "fmt" - "github.com/youzan/nsq/nsqd" "math" "sort" "strconv" "strings" "sync/atomic" "time" + + "github.com/youzan/nsq/nsqd" ) // new topic can have an advice load factor to give the suggestion about the diff --git a/consistence/nsqd_node_etcd_test.go b/consistence/nsqd_node_etcd_test.go index 07bbb7d0..b44ade36 100644 --- a/consistence/nsqd_node_etcd_test.go +++ b/consistence/nsqd_node_etcd_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - "github.com/youzan/nsq/internal/test" etcdlock "github.com/reechou/xlock2" + "github.com/youzan/nsq/internal/test" "golang.org/x/net/context" ) diff --git a/internal/http_api/api_request.go b/internal/http_api/api_request.go index cd283847..12f90a6c 100644 --- a/internal/http_api/api_request.go +++ b/internal/http_api/api_request.go @@ -189,7 +189,7 @@ retry: } func (c *Client) POSTV1WithContent(endpoint string, content string) (int, error) { - retry: +retry: req, err := http.NewRequest("POST", endpoint, strings.NewReader(content)) if err != nil { return -1, err diff --git a/nsqd/delay_queue_test.go b/nsqd/delay_queue_test.go index 5523a537..24eba9fd 100644 --- a/nsqd/delay_queue_test.go +++ b/nsqd/delay_queue_test.go @@ -2,15 +2,16 @@ package nsqd import ( //"github.com/youzan/nsq/internal/levellogger" + "encoding/json" "fmt" - "github.com/youzan/nsq/internal/ext" - "github.com/youzan/nsq/internal/test" "io/ioutil" "os" "path" "testing" "time" - "encoding/json" + + "github.com/youzan/nsq/internal/ext" + "github.com/youzan/nsq/internal/test" ) func TestDelayQueuePutChannelDelayed(t *testing.T) { @@ -158,6 +159,7 @@ func TestDelayQueueEmptyUntil(t *testing.T) { msg.DelayedChannel = "test2" msg.DelayedOrigID = MessageID(i + 1) _, _, _, _, err = dq.PutDelayMessage(msg) + test.Nil(t, err) time.Sleep(time.Millisecond * 100) } @@ -170,6 +172,7 @@ func TestDelayQueueEmptyUntil(t *testing.T) { recent, _, _ := dq.GetOldestConsumedState([]string{"test"}, true) test.Equal(t, 1, len(recent)) _, ts, id, ch, err := decodeDelayedMsgDBKey(recent[0]) + test.Nil(t, err) test.Equal(t, middle.DelayedChannel, ch) test.Equal(t, middle.ID, id) test.Equal(t, middle.DelayedTs, ts) @@ -215,6 +218,7 @@ func TestDelayQueuePeekRecent(t *testing.T) { msg.DelayedChannel = "test2" msg.DelayedOrigID = MessageID(i + 1) _, _, _, _, err = dq.PutDelayMessage(msg) + test.Nil(t, err) time.Sleep(time.Millisecond * 100) } @@ -272,6 +276,7 @@ func TestDelayQueueConfirmMsg(t *testing.T) { msg.DelayedChannel = "test2" msg.DelayedOrigID = MessageID(i + 1) _, _, _, _, err = dq.PutDelayMessage(msg) + test.Nil(t, err) time.Sleep(time.Millisecond * 100) } @@ -381,6 +386,7 @@ func TestDelayQueueBackupRestore(t *testing.T) { msg.DelayedChannel = "test2" msg.DelayedOrigID = MessageID(i + 1) _, _, _, _, err = dq.PutDelayMessage(msg) + test.Nil(t, err) time.Sleep(time.Millisecond * 100) } @@ -509,6 +515,7 @@ func TestDelayQueueCompactStore(t *testing.T) { err = dq.compactStore(false) test.Nil(t, err) fi2, err = os.Stat(dq.getStore().Path()) + test.Nil(t, err) t.Log(fi) t.Log(fi2) afterCompact, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test") diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 100e7191..a236d7a1 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -35,6 +35,7 @@ type errStore struct { err error } + var ( ErrTopicPartitionMismatch = errors.New("topic partition mismatch") ErrTopicNotExist = errors.New("topic does not exist") diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index 946be002..0bd3cddb 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -4,13 +4,14 @@ import ( "errors" "os" //"runtime" - "github.com/absolute8511/glog" - "github.com/youzan/nsq/internal/test" "path" "path/filepath" "strconv" "testing" "time" + + "github.com/absolute8511/glog" + "github.com/youzan/nsq/internal/test" ) func TestGetTopic(t *testing.T) { @@ -508,7 +509,7 @@ func TestTopicResetWithQueueStart(t *testing.T) { for i := 0; i < msgNum; i++ { msg.ID = 0 - _, _, msgSize, _, _ = topic.PutMessage(msg) + topic.PutMessage(msg) } topic.ForceFlush() newEnd = topic.backend.GetQueueWriteEnd().(*diskQueueEndInfo) diff --git a/nsqdserver/protocol_v2_test.go b/nsqdserver/protocol_v2_test.go index 9c918dac..c8341dc2 100644 --- a/nsqdserver/protocol_v2_test.go +++ b/nsqdserver/protocol_v2_test.go @@ -25,13 +25,13 @@ import ( "testing" "time" + "github.com/golang/snappy" "github.com/youzan/go-nsq" "github.com/youzan/nsq/internal/ext" "github.com/youzan/nsq/internal/levellogger" "github.com/youzan/nsq/internal/protocol" "github.com/youzan/nsq/internal/test" nsqdNs "github.com/youzan/nsq/nsqd" - "github.com/golang/snappy" ) func identify(t *testing.T, conn io.ReadWriter, extra map[string]interface{}, f int32) []byte { @@ -118,6 +118,7 @@ func subFail(t *testing.T, conn io.ReadWriter, topicName string, channelName str _, err := nsq.Subscribe(topicName, channelName).WriteTo(conn) test.Equal(t, err, nil) resp, err := nsq.ReadResponse(conn) + test.Nil(t, err) frameType, _, _ := nsq.UnpackResponse(resp) test.Equal(t, frameType, frameTypeError) }