diff --git a/logging/logging.go b/logging/logging.go index ac895e9d5..f8742ac51 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -74,6 +74,14 @@ func (t timestamp) String() string { return time.Unix(0, int64(t)).Format(time.RFC3339) } +func Timep(key string, time *int64) zapcore.Field { + if time == nil { + return zap.String(key, "-") + } else { + return Time(key, *time) + } +} + func Epoch(key string, time time.Time) zap.Field { return zap.String(key, fmt.Sprintf("%d", time.UnixNano())) } diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go new file mode 100644 index 000000000..313ee0a45 --- /dev/null +++ b/waku/v2/api/history/cycle.go @@ -0,0 +1,524 @@ +package history + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "math" + "math/big" + "net" + "net/http" + "runtime" + "sort" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" +) + +const defaultBackoff = 10 * time.Second +const graylistBackoff = 3 * time.Minute +const storenodeVerificationInterval = time.Second +const storenodeMaxFailedRequests uint = 2 +const minStorenodesToChooseFrom = 3 +const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64" +const findNearestMailServer = !isAndroidEmulator +const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios" +const bootstrapDNS = "8.8.8.8:53" + +type connStatus int + +const ( + disconnected connStatus = iota + 1 + connected +) + +type peerStatus struct { + status connStatus + canConnectAfter time.Time + lastConnectionAttempt time.Time +} + +type StorenodeConfigProvider interface { + UseStorenodes() (bool, error) + GetPinnedStorenode() (peer.ID, error) + Storenodes() ([]peer.ID, error) +} + +type StorenodeCycle struct { + sync.RWMutex + + logger *zap.Logger + + host host.Host + + storenodeConfigProvider StorenodeConfigProvider + + StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}] + StorenodeChangedEmitter *Emitter[peer.ID] + StorenodeNotWorkingEmitter *Emitter[struct{}] + StorenodeAvailableEmitter *Emitter[peer.ID] + + failedRequests map[peer.ID]uint + + peersMutex sync.RWMutex + activeStorenode peer.ID + peers map[peer.ID]peerStatus +} + +func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { + return &StorenodeCycle{ + StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](), + StorenodeChangedEmitter: NewEmitter[peer.ID](), + StorenodeNotWorkingEmitter: NewEmitter[struct{}](), + StorenodeAvailableEmitter: NewEmitter[peer.ID](), + logger: logger.Named("storenode-cycle"), + } +} + +func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) { + m.logger.Debug("starting storenode cycle") + m.host = h + m.failedRequests = make(map[peer.ID]uint) + m.peers = make(map[peer.ID]peerStatus) + + go m.verifyStorenodeStatus(ctx) +} + +func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) { + m.Lock() + defer m.Unlock() + + m.disconnectActiveStorenode(backoff) +} + +func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error { + // Handle pinned storenodes + m.logger.Info("disconnecting storenode") + pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode() + if err != nil { + m.logger.Error("could not obtain the pinned storenode", zap.Error(err)) + return err + } + + // If no pinned storenode, no need to disconnect and wait for it to be available + if pinnedStorenode == "" { + m.disconnectActiveStorenode(graylistBackoff) + } + + return m.findNewStorenode(ctx) +} + +func (m *StorenodeCycle) disconnectStorenode(backoffDuration time.Duration) error { + if m.activeStorenode == "" { + m.logger.Info("no active storenode") + return nil + } + + m.logger.Info("disconnecting active storenode", zap.Stringer("peerID", m.activeStorenode)) + + m.peersMutex.Lock() + pInfo, ok := m.peers[m.activeStorenode] + if ok { + pInfo.status = disconnected + pInfo.canConnectAfter = time.Now().Add(backoffDuration) + m.peers[m.activeStorenode] = pInfo + } else { + m.peers[m.activeStorenode] = peerStatus{ + status: disconnected, + canConnectAfter: time.Now().Add(backoffDuration), + } + } + m.peersMutex.Unlock() + + m.activeStorenode = "" + + return nil +} + +func (m *StorenodeCycle) disconnectActiveStorenode(backoffDuration time.Duration) { + err := m.disconnectStorenode(backoffDuration) + if err != nil { + m.logger.Error("failed to disconnect storenode", zap.Error(err)) + } + + m.StorenodeChangedEmitter.Emit("") +} + +func (m *StorenodeCycle) Cycle(ctx context.Context) { + if m.storenodeConfigProvider == nil { + m.logger.Debug("storenodeConfigProvider not yet setup") + return + } + + m.logger.Info("Automatically switching storenode") + + if m.activeStorenode != "" { + m.disconnectActiveStorenode(graylistBackoff) + } + + useStorenode, err := m.storenodeConfigProvider.UseStorenodes() + if err != nil { + m.logger.Error("failed to get use storenodes", zap.Error(err)) + return + } + + if !useStorenode { + m.logger.Info("Skipping storenode search due to useStorenode being false") + return + } + + err = m.findNewStorenode(ctx) + if err != nil { + m.logger.Error("Error getting new storenode", zap.Error(err)) + } +} + +func poolSize(fleetSize int) int { + return int(math.Ceil(float64(fleetSize) / 4)) +} + +func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID { + availableStorenodes := make(map[peer.ID]time.Duration) + availableStorenodesMutex := sync.Mutex{} + availableStorenodesWg := sync.WaitGroup{} + for _, storenode := range allStorenodes { + availableStorenodesWg.Add(1) + go func(peerID peer.ID) { + defer availableStorenodesWg.Done() + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + rtt, err := m.pingPeer(ctx, peerID) + if err == nil { // pinging storenodes might fail, but we don't care + availableStorenodesMutex.Lock() + availableStorenodes[peerID] = rtt + availableStorenodesMutex.Unlock() + } + }(storenode) + } + availableStorenodesWg.Wait() + + if len(availableStorenodes) == 0 { + m.logger.Warn("No storenodes available") // Do nothing.. + return nil + } + + var sortedStorenodes []SortedStorenode + for storenodeID, rtt := range availableStorenodes { + sortedStorenode := SortedStorenode{ + Storenode: storenodeID, + RTT: rtt, + } + m.peersMutex.Lock() + pInfo, ok := m.peers[storenodeID] + m.peersMutex.Unlock() + if ok && time.Now().Before(pInfo.canConnectAfter) { + continue // We can't connect to this node yet + } + sortedStorenodes = append(sortedStorenodes, sortedStorenode) + } + sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes)) + + result := make([]peer.ID, len(sortedStorenodes)) + for i, s := range sortedStorenodes { + result[i] = s.Storenode + } + + return result +} + +func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + pingResultCh := ping.Ping(ctx, m.host, peerID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} + +func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { + // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 + if overrideDNS { + var dialer net.Dialer + net.DefaultResolver = &net.Resolver{ + PreferGo: false, + Dial: func(context context.Context, _, _ string) (net.Conn, error) { + conn, err := dialer.DialContext(context, "udp", bootstrapDNS) + if err != nil { + return nil, err + } + return conn, nil + }, + } + } + + pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode() + if err != nil { + m.logger.Error("Could not obtain the pinned storenode", zap.Error(err)) + return err + } + + if pinnedStorenode != "" { + return m.setActiveStorenode(pinnedStorenode) + } + + m.logger.Info("Finding a new storenode..") + + allStorenodes, err := m.storenodeConfigProvider.Storenodes() + if err != nil { + return err + } + + // TODO: remove this check once sockets are stable on x86_64 emulators + if findNearestMailServer { + allStorenodes = m.getAvailableStorenodesSortedByRTT(ctx, allStorenodes) + } + + // Picks a random storenode amongs the ones with the lowest latency + // The pool size is 1/4 of the storenodes were pinged successfully + // If the pool size is less than `minStorenodesToChooseFrom`, it will + // pick a storenode fromm all the available storenodes + pSize := poolSize(len(allStorenodes) - 1) + if pSize <= minStorenodesToChooseFrom { + pSize = len(allStorenodes) + if pSize <= 0 { + m.logger.Warn("No storenodes available") // Do nothing.. + return nil + } + } + + r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize))) + if err != nil { + return err + } + + ms := allStorenodes[r.Int64()] + return m.setActiveStorenode(ms) +} + +func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus { + m.peersMutex.RLock() + defer m.peersMutex.RUnlock() + + peer, ok := m.peers[peerID] + if !ok { + return disconnected + } + return peer.status +} + +func (m *StorenodeCycle) setActiveStorenode(peerID peer.ID) error { + m.activeStorenode = peerID + + m.StorenodeChangedEmitter.Emit(m.activeStorenode) + + storenodeStatus := m.storenodeStatus(peerID) + if storenodeStatus != connected { + m.peersMutex.Lock() + m.peers[peerID] = peerStatus{ + status: connected, + lastConnectionAttempt: time.Now(), + canConnectAfter: time.Now().Add(defaultBackoff), + } + m.peersMutex.Unlock() + + m.failedRequests[peerID] = 0 + m.logger.Info("storenode available", zap.Stringer("peerID", m.activeStorenode)) + + m.StorenodeAvailableOneshotEmitter.Emit(struct{}{}) // Maybe can be refactored away? + m.StorenodeAvailableEmitter.Emit(m.activeStorenode) + } + return nil +} + +func (m *StorenodeCycle) GetActiveStorenode() peer.ID { + m.RLock() + defer m.RUnlock() + + return m.activeStorenode +} + +func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool { + return m.storenodeStatus(peerID) == connected +} + +func (m *StorenodeCycle) penalizeStorenode(id peer.ID) { + m.peersMutex.Lock() + defer m.peersMutex.Unlock() + pInfo, ok := m.peers[id] + if !ok { + pInfo.status = disconnected + } + + pInfo.canConnectAfter = time.Now().Add(graylistBackoff) + m.peers[id] = pInfo +} + +func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) { + ticker := time.NewTicker(storenodeVerificationInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := m.disconnectStorenodeIfRequired(ctx) + if err != nil { + m.logger.Error("failed to handle storenode cycle event", zap.Error(err)) + continue + } + + case <-ctx.Done(): + return + } + } +} + +func (m *StorenodeCycle) disconnectStorenodeIfRequired(ctx context.Context) error { + m.logger.Debug("wakuV2 storenode status verification") + + if m.activeStorenode == "" { + // No active storenode, find a new one + m.Cycle(ctx) + return nil + } + + // Check whether we want to disconnect the active storenode + if m.failedRequests[m.activeStorenode] >= storenodeMaxFailedRequests { + m.penalizeStorenode(m.activeStorenode) + m.StorenodeNotWorkingEmitter.Emit(struct{}{}) + + m.logger.Info("too many failed requests", zap.Stringer("storenode", m.activeStorenode)) + m.failedRequests[m.activeStorenode] = 0 + return m.connectToNewStorenodeAndWait(ctx) + } + + return nil +} + +func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProvider) { + m.storenodeConfigProvider = provider +} + +func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool { + // Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. + // This can be improved after merging https://github.com/status-im/status-go/pull/4380. + // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately + timeout += time.Second + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for !m.IsStorenodeAvailable(m.activeStorenode) { + select { + case <-m.StorenodeAvailableOneshotEmitter.Subscribe(): + case <-ctx.Done(): + return + } + } + }() + + select { + case <-waitForWaitGroup(&wg): + case <-ctx.Done(): + } + + return m.IsStorenodeAvailable(m.activeStorenode) +} + +func waitForWaitGroup(wg *sync.WaitGroup) <-chan struct{} { + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + return ch +} + +type storenodeTaskParameters struct { + customPeerID peer.ID +} + +type StorenodeTaskOption func(*storenodeTaskParameters) + +func WithPeerID(peerID peer.ID) StorenodeTaskOption { + return func(stp *storenodeTaskParameters) { + stp.customPeerID = peerID + } +} + +func (m *StorenodeCycle) PerformStorenodeTask(fn func() error, options ...StorenodeTaskOption) error { + params := storenodeTaskParameters{} + for _, opt := range options { + opt(¶ms) + } + + peerID := params.customPeerID + if peerID == "" { + peerID = m.GetActiveStorenode() + } + + if peerID == "" { + return errors.New("storenode not available") + } + + m.RLock() + defer m.RUnlock() + + var tries uint = 0 + for tries < storenodeMaxFailedRequests { + if params.customPeerID == "" && m.storenodeStatus(peerID) != connected { + return errors.New("storenode not available") + } + m.logger.Info("trying performing history requests", zap.Uint("try", tries), zap.Stringer("peerID", peerID)) + + // Peform request + err := fn() + if err == nil { + // Reset failed requests + m.logger.Debug("history request performed successfully", zap.Stringer("peerID", peerID)) + m.failedRequests[peerID] = 0 + return nil + } + + m.logger.Error("failed to perform history request", + zap.Stringer("peerID", peerID), + zap.Uint("tries", tries), + zap.Error(err), + ) + + tries++ + + if storeErr, ok := err.(*store.StoreError); ok { + if storeErr.Code == http.StatusTooManyRequests { + m.disconnectActiveStorenode(defaultBackoff) + return fmt.Errorf("ratelimited at storenode %s: %w", peerID, err) + } + } + + // Increment failed requests + m.failedRequests[peerID]++ + + // Change storenode + if m.failedRequests[peerID] >= storenodeMaxFailedRequests { + return errors.New("too many failed requests") + } + // Wait a couple of second not to spam + time.Sleep(2 * time.Second) + + } + return errors.New("failed to perform history request") +} diff --git a/waku/v2/api/history/emitters.go b/waku/v2/api/history/emitters.go new file mode 100644 index 000000000..a12d4db90 --- /dev/null +++ b/waku/v2/api/history/emitters.go @@ -0,0 +1,48 @@ +package history + +import "sync" + +type Emitter[T any] struct { + sync.Mutex + subscriptions []chan T +} + +func NewEmitter[T any]() *Emitter[T] { + return &Emitter[T]{} +} + +func (s *Emitter[T]) Subscribe() <-chan T { + s.Lock() + defer s.Unlock() + c := make(chan T) + s.subscriptions = append(s.subscriptions, c) + return c +} + +func (s *Emitter[T]) Emit(value T) { + s.Lock() + defer s.Unlock() + + for _, sub := range s.subscriptions { + sub <- value + } +} + +type OneShotEmitter[T any] struct { + Emitter[T] +} + +func NewOneshotEmitter[T any]() *OneShotEmitter[T] { + return &OneShotEmitter[T]{} +} + +func (s *OneShotEmitter[T]) Emit(value T) { + s.Lock() + defer s.Unlock() + + for _, subs := range s.subscriptions { + subs <- value + close(subs) + } + s.subscriptions = nil +} diff --git a/waku/v2/api/history/emitters_test.go b/waku/v2/api/history/emitters_test.go new file mode 100644 index 000000000..a90eda3b3 --- /dev/null +++ b/waku/v2/api/history/emitters_test.go @@ -0,0 +1,67 @@ +package history + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEmitter(t *testing.T) { + emitter := NewEmitter[int]() + + subscr1 := emitter.Subscribe() + subscr2 := emitter.Subscribe() + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + defer wg.Done() + emitter.Emit(1) + emitter.Emit(2) + }() + + go func() { + defer wg.Done() + require.Equal(t, 1, <-subscr1) + require.Equal(t, 2, <-subscr1) + }() + + go func() { + defer wg.Done() + require.Equal(t, 1, <-subscr2) + require.Equal(t, 2, <-subscr2) + }() + + wg.Wait() +} + +func TestOneShotEmitter(t *testing.T) { + emitter := NewOneshotEmitter[struct{}]() + + subscr1 := emitter.Subscribe() + subscr2 := emitter.Subscribe() + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + defer wg.Done() + emitter.Emit(struct{}{}) + }() + + go func() { + defer wg.Done() + for range subscr1 { + } + }() + + go func() { + defer wg.Done() + for range subscr2 { + } + }() + + wg.Wait() +} diff --git a/waku/v2/api/history/history.go b/waku/v2/api/history/history.go new file mode 100644 index 000000000..e95f01a57 --- /dev/null +++ b/waku/v2/api/history/history.go @@ -0,0 +1,296 @@ +package history + +import ( + "context" + "errors" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" +) + +const maxTopicsPerRequest int = 10 +const mailserverRequestTimeout = 30 * time.Second + +type work struct { + criteria store.FilterCriteria + cursor []byte + limit uint64 +} + +type HistoryRetriever struct { + store Store + logger *zap.Logger + historyProcessor HistoryProcessor +} + +type HistoryProcessor interface { + OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error + OnRequestFailed(requestID []byte, peerID peer.ID, err error) +} + +type Store interface { + Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) +} + +func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { + return &HistoryRetriever{ + store: store, + logger: logger.Named("history-retriever"), + historyProcessor: historyProcessor, + } +} + +func (hr *HistoryRetriever) Query( + ctx context.Context, + criteria store.FilterCriteria, + storenodeID peer.ID, + pageLimit uint64, + shouldProcessNextPage func(int) (bool, uint64), + processEnvelopes bool, +) error { + logger := hr.logger.With( + logging.Timep("fromString", criteria.TimeStart), + logging.Timep("toString", criteria.TimeEnd), + zap.String("pubsubTopic", criteria.PubsubTopic), + zap.Strings("contentTopics", criteria.ContentTopicsList()), + zap.Int64p("from", criteria.TimeStart), + zap.Int64p("to", criteria.TimeEnd), + ) + + logger.Info("syncing") + + wg := sync.WaitGroup{} + workWg := sync.WaitGroup{} + workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel + workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered + semaphore := make(chan struct{}, 3) // limit the number of concurrent queries + errCh := make(chan error) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // TODO: refactor this by extracting the consumer into a separate go routine. + + // Producer + wg.Add(1) + go func() { + defer func() { + logger.Debug("mailserver batch producer complete") + wg.Done() + }() + + contentTopicList := criteria.ContentTopics.ToList() + + // TODO: split into 24h batches + + allWorks := int(math.Ceil(float64(len(contentTopicList)) / float64(maxTopicsPerRequest))) + workWg.Add(allWorks) + + for i := 0; i < len(contentTopicList); i += maxTopicsPerRequest { + j := i + maxTopicsPerRequest + if j > len(contentTopicList) { + j = len(contentTopicList) + } + + select { + case <-ctx.Done(): + logger.Debug("processBatch producer - context done") + return + default: + logger.Debug("processBatch producer - creating work") + workCh <- work{ + criteria: store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(criteria.PubsubTopic, contentTopicList[i:j]...), + TimeStart: criteria.TimeStart, + TimeEnd: criteria.TimeEnd, + }, + limit: pageLimit, + } + } + } + + go func() { + workWg.Wait() + workCompleteCh <- struct{}{} + }() + + logger.Debug("processBatch producer complete") + }() + + var result error + +loop: + for { + select { + case <-ctx.Done(): + logger.Debug("processBatch cleanup - context done") + result = ctx.Err() + if errors.Is(result, context.Canceled) { + result = nil + } + break loop + case w, ok := <-workCh: + if !ok { + continue + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + // continue... + } + + logger.Debug("processBatch - received work") + + semaphore <- struct{}{} + go func(w work) { // Consumer + defer func() { + workWg.Done() + <-semaphore + }() + + queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) + cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger) + queryCancel() + + if err != nil { + logger.Debug("failed to send request", zap.Error(err)) + errCh <- err + return + } + + processNextPage := true + nextPageLimit := pageLimit + if shouldProcessNextPage != nil { + processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount) + } + + if !processNextPage { + return + } + + // Check the cursor after calling `shouldProcessNextPage`. + // The app might use process the fetched envelopes in the callback for own needs. + if cursor == nil { + return + } + + logger.Debug("processBatch producer - creating work (cursor)") + + workWg.Add(1) + workCh <- work{ + criteria: w.criteria, + cursor: cursor, + limit: nextPageLimit, + } + }(w) + case err := <-errCh: + logger.Debug("processBatch - received error", zap.Error(err)) + cancel() // Kill go routines + return err + case <-workCompleteCh: + logger.Debug("processBatch - all jobs complete") + cancel() // Kill go routines + } + } + + wg.Wait() + + logger.Info("synced topic", zap.NamedError("hasError", result)) + + return result +} + +func (hr *HistoryRetriever) createMessagesRequest( + ctx context.Context, + peerID peer.ID, + criteria store.FilterCriteria, + cursor []byte, + limit uint64, + waitForResponse bool, + processEnvelopes bool, + logger *zap.Logger, +) (storeCursor []byte, envelopesCount int, err error) { + if waitForResponse { + resultCh := make(chan struct { + storeCursor []byte + envelopesCount int + err error + }) + + go func() { + storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes) + resultCh <- struct { + storeCursor []byte + envelopesCount int + err error + }{storeCursor, envelopesCount, err} + }() + + select { + case result := <-resultCh: + return result.storeCursor, result.envelopesCount, result.err + case <-ctx.Done(): + return nil, 0, ctx.Err() + } + } else { + go func() { + _, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false) + if err != nil { + logger.Error("failed to request store messages", zap.Error(err)) + } + }() + } + + return +} + +func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) { + requestID := protocol.GenerateRequestID() + logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) + + opts := []store.RequestOption{ + store.WithPaging(false, limit), + store.WithRequestID(requestID), + store.WithPeer(peerID), + store.WithCursor(cursor)} + + logger.Debug("store.query", + logging.Timep("startTime", criteria.TimeStart), + logging.Timep("endTime", criteria.TimeEnd), + zap.Strings("contentTopics", criteria.ContentTopics.ToList()), + zap.String("pubsubTopic", criteria.PubsubTopic), + zap.String("cursor", hexutil.Encode(cursor)), + ) + + queryStart := time.Now() + result, err := hr.store.Query(ctx, criteria, opts...) + queryDuration := time.Since(queryStart) + if err != nil { + logger.Error("error querying storenode", zap.Error(err)) + + hr.historyProcessor.OnRequestFailed(requestID, peerID, err) + + return nil, 0, err + } + + messages := result.Messages() + envelopesCount := len(messages) + logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil)) + for _, mkv := range messages { + envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()) + err := hr.historyProcessor.OnEnvelope(envelope, processEnvelopes) + if err != nil { + return nil, 0, err + } + } + return result.Cursor(), envelopesCount, nil +} diff --git a/waku/v2/api/history/history_test.go b/waku/v2/api/history/history_test.go new file mode 100644 index 000000000..d256850a8 --- /dev/null +++ b/waku/v2/api/history/history_test.go @@ -0,0 +1,254 @@ +package history + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "math/big" + "sort" + "testing" + "time" + + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol" + proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" + + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type queryResponse struct { + contentTopics []string + messages []*pb.WakuMessageKeyValue + err error // Indicates if this response will simulate an error returned by SendMessagesRequestForTopics + cursor []byte +} + +type mockResult struct { + cursor []byte + messages []*pb.WakuMessageKeyValue +} + +func (r *mockResult) Cursor() []byte { + return r.cursor +} + +func (r *mockResult) Messages() []*pb.WakuMessageKeyValue { + return r.messages +} + +func (r *mockResult) IsComplete() bool { + return false +} + +func (r *mockResult) PeerID() peer.ID { + return "" +} + +func (r *mockResult) Query() *pb.StoreQueryRequest { + return nil +} + +func (r *mockResult) Response() *pb.StoreQueryResponse { + return nil +} + +func (r *mockResult) Next(ctx context.Context, opts ...store.RequestOption) error { + return nil +} + +type mockHistoryProcessor struct { +} + +func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error { + return nil +} + +func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { +} + +func newMockHistoryProcessor() *mockHistoryProcessor { + return &mockHistoryProcessor{} +} + +type mockStore struct { + queryResponses map[string]queryResponse +} + +func newMockStore() *mockStore { + return &mockStore{ + queryResponses: make(map[string]queryResponse), + } +} + +func getInitialResponseKey(contentTopics []string) string { + sort.Strings(contentTopics) + return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...)) +} + +func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) { + params := store.Parameters{} + for _, opt := range opts { + _ = opt(¶ms) + } + result := &mockResult{} + if params.Cursor() == nil { + initialResponse := getInitialResponseKey(criteria.ContentTopicsList()) + response := t.queryResponses[initialResponse] + if response.err != nil { + return nil, response.err + } + result.cursor = response.cursor + result.messages = response.messages + } else { + response := t.queryResponses[hex.EncodeToString(params.Cursor())] + if response.err != nil { + return nil, response.err + } + result.cursor = response.cursor + result.messages = response.messages + } + + return result, nil +} + +func (t *mockStore) Populate(topics []string, responses int, includeRandomError bool) error { + if responses <= 0 || len(topics) == 0 { + return errors.New("invalid input parameters") + } + + var topicBatches [][]string + + for i := 0; i < len(topics); i += maxTopicsPerRequest { + // Split batch in 10-contentTopic subbatches + j := i + maxTopicsPerRequest + if j > len(topics) { + j = len(topics) + } + topicBatches = append(topicBatches, topics[i:j]) + } + + randomErrIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(topicBatches)))) + if err != nil { + return err + } + randomErrIdxInt := int(randomErrIdx.Int64()) + + for i, topicBatch := range topicBatches { + // Setup initial response + initialResponseKey := getInitialResponseKey(topicBatch) + t.queryResponses[initialResponseKey] = queryResponse{ + contentTopics: topicBatch, + messages: []*pb.WakuMessageKeyValue{ + { + MessageHash: protocol.GenerateRequestID(), + Message: &proto_pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "abc", + Timestamp: proto.Int64(time.Now().UnixNano()), + }, + PubsubTopic: proto.String("test"), + }, + }, + err: nil, + } + + prevKey := initialResponseKey + for x := 0; x < responses-1; x++ { + newResponseCursor := []byte(uuid.New().String()) + newResponseKey := hex.EncodeToString(newResponseCursor) + + var err error + if includeRandomError && i == randomErrIdxInt && x == responses-2 { // Include an error in last request + err = errors.New("random error") + } + + t.queryResponses[newResponseKey] = queryResponse{ + contentTopics: topicBatch, + messages: []*pb.WakuMessageKeyValue{ + { + MessageHash: protocol.GenerateRequestID(), + Message: &proto_pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "abc", + Timestamp: proto.Int64(time.Now().UnixNano()), + }, + PubsubTopic: proto.String("test"), + }, + }, + err: err, + } + + // Updating prev response cursor to point to the new response + prevResponse := t.queryResponses[prevKey] + prevResponse.cursor = newResponseCursor + t.queryResponses[prevKey] = prevResponse + + prevKey = newResponseKey + } + + } + + return nil +} + +func TestSuccessBatchExecution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) + + topics := []string{} + for i := 0; i < 50; i++ { + topics = append(topics, uuid.NewString()) + } + + testStore := newMockStore() + err = testStore.Populate(topics, 10, false) + require.NoError(t, err) + + historyProcessor := newMockHistoryProcessor() + + historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger()) + + criteria := store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter("test", topics...), + } + + err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true) + require.NoError(t, err) +} + +func TestFailedBatchExecution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) + + topics := []string{} + for i := 0; i < 2; i++ { + topics = append(topics, uuid.NewString()) + } + + testStore := newMockStore() + err = testStore.Populate(topics, 10, true) + require.NoError(t, err) + + historyProcessor := newMockHistoryProcessor() + + historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger()) + + criteria := store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter("test", topics...), + } + + err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true) + require.Error(t, err) +} diff --git a/waku/v2/api/history/sort.go b/waku/v2/api/history/sort.go new file mode 100644 index 000000000..22e94c571 --- /dev/null +++ b/waku/v2/api/history/sort.go @@ -0,0 +1,32 @@ +package history + +import ( + "time" + + "github.com/libp2p/go-libp2p/core/peer" +) + +type SortedStorenode struct { + Storenode peer.ID + RTT time.Duration + CanConnectAfter time.Time +} + +type byRTTMsAndCanConnectBefore []SortedStorenode + +func (s byRTTMsAndCanConnectBefore) Len() int { + return len(s) +} + +func (s byRTTMsAndCanConnectBefore) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool { + // Slightly inaccurate as time sensitive sorting, but it does not matter so much + now := time.Now() + if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) { + return s[i].RTT < s[j].RTT + } + return s[i].CanConnectAfter.Before(s[j].CanConnectAfter) +} diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index ca8b63fb7..095e32419 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -178,7 +178,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } } -func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) { +func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (store.Result, error), logger *zap.Logger, logMsg string) (store.Result, error) { retry := true count := 1 for retry && count <= m.params.maxAttemptsToRetrieveHistory { @@ -212,7 +212,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, logging.Epoch("to", now), ) - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { return m.store.Query(ctx, store.FilterCriteria{ ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...), TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), @@ -243,7 +243,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, missingHashes = append(missingHashes, hash) } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { if err = result.Next(ctx); err != nil { return nil, err } @@ -282,7 +282,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, defer utils.LogOnPanic() defer wg.Wait() - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) @@ -303,7 +303,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, } } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { if err = result.Next(ctx); err != nil { return nil, err } diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index c6df0f29a..059165740 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -8,8 +8,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/libp2p/go-libp2p/core/peer" apicommon "github.com/waku-org/go-waku/waku/v2/api/common" + "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -29,7 +29,6 @@ type ISentCheck interface { Start() Add(topic string, messageID common.Hash, sentTime uint32) DeleteByMessageIDs(messageIDs []common.Hash) - SetStorePeerID(peerID peer.ID) } // MessageSentCheck tracks the outgoing messages and check against store node @@ -38,11 +37,11 @@ type ISentCheck interface { type MessageSentCheck struct { messageIDs map[string]map[common.Hash]uint32 messageIDsMu sync.RWMutex - storePeerID peer.ID messageStoredChan chan common.Hash messageExpiredChan chan common.Hash ctx context.Context store *store.WakuStore + storenodeCycle *history.StorenodeCycle timesource timesource.Timesource logger *zap.Logger maxHashQueryLength uint64 @@ -53,7 +52,7 @@ type MessageSentCheck struct { } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters -func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { +func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { return &MessageSentCheck{ messageIDs: make(map[string]map[common.Hash]uint32), messageIDsMu: sync.RWMutex{}, @@ -61,6 +60,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource messageExpiredChan: msgExpiredChan, ctx: ctx, store: store, + storenodeCycle: cycle, timesource: timesource, logger: logger, maxHashQueryLength: DefaultMaxHashQueryLength, @@ -139,11 +139,6 @@ func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash) { } } -// SetStorePeerID sets the peer id of store node -func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) { - m.storePeerID = peerID -} - // Start checks if the tracked outgoing messages are stored periodically func (m *MessageSentCheck) Start() { defer utils.LogOnPanic() @@ -211,7 +206,7 @@ func (m *MessageSentCheck) Start() { } func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash { - selectedPeer := m.storePeerID + selectedPeer := m.storenodeCycle.GetActiveStorenode() if selectedPeer == "" { m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) return []common.Hash{} diff --git a/waku/v2/api/publish/message_check_test.go b/waku/v2/api/publish/message_check_test.go index ef53f4d36..4fe7b6eea 100644 --- a/waku/v2/api/publish/message_check_test.go +++ b/waku/v2/api/publish/message_check_test.go @@ -10,7 +10,7 @@ import ( func TestAddAndDelete(t *testing.T) { ctx := context.TODO() - messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil) + messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil, nil) messageSentCheck.Add("topic", [32]byte{1}, 1) messageSentCheck.Add("topic", [32]byte{2}, 2) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 479d894ad..c1e9a4ca7 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -162,9 +161,3 @@ func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) { ms.messageSentCheck.DeleteByMessageIDs(messageIDs) } } - -func (ms *MessageSender) SetStorePeerID(peerID peer.ID) { - if ms.messageSentCheck != nil { - ms.messageSentCheck.SetStorePeerID(peerID) - } -} diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index f7427b979..090ef8f04 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -50,8 +50,8 @@ type StoreError struct { } // NewStoreError creates a new instance of StoreError -func NewStoreError(code int, message string) StoreError { - return StoreError{ +func NewStoreError(code int, message string) *StoreError { + return &StoreError{ Code: code, Message: message, } @@ -99,7 +99,7 @@ func (s *WakuStore) SetHost(h host.Host) { // Request is used to send a store query. This function requires understanding how to prepare a store query // and most of the time you can use `Query`, `QueryByHash` and `Exists` instead, as they provide // a simpler API -func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (Result, error) { params := new(Parameters) optList := DefaultOptions() @@ -182,7 +182,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return nil, err } - result := &Result{ + result := &resultImpl{ store: s, messages: response.Messages, storeRequest: storeRequest, @@ -195,12 +195,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ } // Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not. -func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) { return s.Request(ctx, criteria, opts...) } // Query retrieves all the messages with specific message hashes -func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (Result, error) { return s.Request(ctx, MessageHashCriteria{messageHashes}, opts...) } @@ -214,17 +214,17 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt return false, err } - return len(result.messages) != 0, nil + return len(result.Messages()) != 0, nil } -func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) (*resultImpl, error) { if r.IsComplete() { - return &Result{ + return &resultImpl{ store: s, messages: nil, cursor: nil, - storeRequest: r.storeRequest, - storeResponse: r.storeResponse, + storeRequest: r.Query(), + storeResponse: r.Response(), peerID: r.PeerID(), }, nil } @@ -240,7 +240,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) } } - storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) + storeRequest := proto.Clone(r.Query()).(*pb.StoreQueryRequest) storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) storeRequest.PaginationCursor = r.Cursor() @@ -249,7 +249,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) return nil, err } - result := &Result{ + result := &resultImpl{ store: s, messages: response.Messages, storeRequest: storeRequest, @@ -317,7 +317,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe if storeResponse.GetStatusCode() != ok { err := NewStoreError(int(storeResponse.GetStatusCode()), storeResponse.GetStatusDesc()) - return nil, &err + return nil, err } return storeResponse, nil } diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index 733a27e93..be42dec07 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -128,33 +128,33 @@ func TestStoreClient(t *testing.T) { // -- First page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[1].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Second page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[2].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[3].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Third page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 1) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp()) + require.Len(t, response.Messages(), 1) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[4].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Trying to continue a completed cursor require.True(t, response.IsComplete()) - require.Len(t, response.messages, 0) + require.Len(t, response.Messages(), 0) err = response.Next(ctx) require.NoError(t, err) @@ -165,26 +165,26 @@ func TestStoreClient(t *testing.T) { // -- First page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[3].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[4].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Second page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[1].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[2].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Third page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 1) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) + require.Len(t, response.Messages(), 1) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) @@ -197,13 +197,13 @@ func TestStoreClient(t *testing.T) { // No cursor should be returned if there are no messages that match the criteria response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2)) require.NoError(t, err) - require.Len(t, response.messages, 0) + require.Len(t, response.Messages(), 0) require.Empty(t, response.Cursor()) // If the page size is larger than the number of existing messages, it should not return a cursor response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100)) require.NoError(t, err) - require.Len(t, response.messages, 5) + require.Len(t, response.Messages(), 5) require.Empty(t, response.Cursor()) // Invalid cursors should fail @@ -225,17 +225,17 @@ func TestStoreClient(t *testing.T) { // Handle temporal history query with a zero-size time window response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: startTime}) require.NoError(t, err) - require.Len(t, response.messages, 0) + require.Len(t, response.Messages(), 0) require.Empty(t, response.Cursor()) // Should not include data response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(pubsubTopic)}}, IncludeData(false), WithPeer(storenode.ID)) require.NoError(t, err) - require.Len(t, response.messages, 1) - require.Nil(t, response.messages[0].Message) + require.Len(t, response.Messages(), 1) + require.Nil(t, response.Messages()[0].Message) response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test")}, IncludeData(false)) require.NoError(t, err) - require.GreaterOrEqual(t, len(response.messages), 1) - require.Nil(t, response.messages[0].Message) + require.GreaterOrEqual(t, len(response.Messages()), 1) + require.Nil(t, response.Messages()[0].Message) } diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index b8deba47c..facb3f54f 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -22,6 +22,10 @@ type Parameters struct { skipRatelimit bool } +func (p *Parameters) Cursor() []byte { + return p.cursor +} + type RequestOption func(*Parameters) error // WithPeer is an option used to specify the peerID to request the message history. diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go index 604d6453c..f5f70106d 100644 --- a/waku/v2/protocol/store/result.go +++ b/waku/v2/protocol/store/result.go @@ -8,7 +8,17 @@ import ( ) // Result represents a valid response from a store node -type Result struct { +type Result interface { + Cursor() []byte + IsComplete() bool + PeerID() peer.ID + Query() *pb.StoreQueryRequest + Response() *pb.StoreQueryResponse + Next(ctx context.Context, opts ...RequestOption) error + Messages() []*pb.WakuMessageKeyValue +} + +type resultImpl struct { done bool messages []*pb.WakuMessageKeyValue @@ -19,27 +29,27 @@ type Result struct { peerID peer.ID } -func (r *Result) Cursor() []byte { +func (r *resultImpl) Cursor() []byte { return r.cursor } -func (r *Result) IsComplete() bool { +func (r *resultImpl) IsComplete() bool { return r.done } -func (r *Result) PeerID() peer.ID { +func (r *resultImpl) PeerID() peer.ID { return r.peerID } -func (r *Result) Query() *pb.StoreQueryRequest { +func (r *resultImpl) Query() *pb.StoreQueryRequest { return r.storeRequest } -func (r *Result) Response() *pb.StoreQueryResponse { +func (r *resultImpl) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { +func (r *resultImpl) Next(ctx context.Context, opts ...RequestOption) error { if r.cursor == nil { r.done = true r.messages = nil @@ -57,6 +67,6 @@ func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { return nil } -func (r *Result) Messages() []*pb.WakuMessageKeyValue { +func (r *resultImpl) Messages() []*pb.WakuMessageKeyValue { return r.messages }