From 65679b91ddf060bbab335f0f2c6248068e8a0911 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Fri, 19 Jul 2024 13:57:02 +0800 Subject: [PATCH 1/3] feat(market): add index provider api --- venus-shared/api/market/v1/api.go | 8 ++ venus-shared/api/market/v1/method.md | 114 ++++++++++++++++++ .../api/market/v1/mock/mock_imarket.go | 90 ++++++++++++++ venus-shared/api/market/v1/proxy_gen.go | 25 ++++ 4 files changed, 237 insertions(+) diff --git a/venus-shared/api/market/v1/api.go b/venus-shared/api/market/v1/api.go index c80aa8b6af..6442bedd5d 100644 --- a/venus-shared/api/market/v1/api.go +++ b/venus-shared/api/market/v1/api.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multihash" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer/v2" @@ -200,5 +201,12 @@ type IMarket interface { UpdateStorageDealPayloadSize(ctx context.Context, dealProposal cid.Cid, payloadSize uint64) error //perm:write + IndexerAnnounceAllDeals(ctx context.Context, minerAddr address.Address) error //perm:admin + IndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) //perm:read + IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) //perm:admin + IndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) //perm:admin + IndexerAnnounceDealRemoved(ctx context.Context, propCid cid.Cid) (cid.Cid, error) //perm:admin + IndexerAnnounceDeal(ctx context.Context, contextID []byte) (cid.Cid, error) //perm:admin + api.Version } diff --git a/venus-shared/api/market/v1/method.md b/venus-shared/api/market/v1/method.md index 3740b26863..77d18c6fe8 100644 --- a/venus-shared/api/market/v1/method.md +++ b/venus-shared/api/market/v1/method.md @@ -56,6 +56,12 @@ curl http://:/rpc/v1 -X POST -H "Content-Type: application/json" -H " * [GetUnPackedDeals](#getunpackeddeals) * [ID](#id) * [ImportDirectDeal](#importdirectdeal) + * [IndexerAnnounceAllDeals](#indexerannouncealldeals) + * [IndexerAnnounceDeal](#indexerannouncedeal) + * [IndexerAnnounceDealRemoved](#indexerannouncedealremoved) + * [IndexerAnnounceLatest](#indexerannouncelatest) + * [IndexerAnnounceLatestHttp](#indexerannouncelatesthttp) + * [IndexerListMultihashes](#indexerlistmultihashes) * [ListDirectDeals](#listdirectdeals) * [ListPieceStorageInfos](#listpiecestorageinfos) * [ListenMarketEvent](#listenmarketevent) @@ -1234,6 +1240,114 @@ Inputs: Response: `{}` +### IndexerAnnounceAllDeals + + +Perms: admin + +Inputs: +```json +[ + "f01234" +] +``` + +Response: `{}` + +### IndexerAnnounceDeal + + +Perms: admin + +Inputs: +```json +[ + "Ynl0ZSBhcnJheQ==" +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + +### IndexerAnnounceDealRemoved + + +Perms: admin + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + +### IndexerAnnounceLatest + + +Perms: admin + +Inputs: `[]` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + +### IndexerAnnounceLatestHttp + + +Perms: admin + +Inputs: +```json +[ + [ + "string value" + ] +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + +### IndexerListMultihashes + + +Perms: read + +Inputs: +```json +[ + "Ynl0ZSBhcnJheQ==" +] +``` + +Response: +```json +[ + "Bw==" +] +``` + ### ListDirectDeals diff --git a/venus-shared/api/market/v1/mock/mock_imarket.go b/venus-shared/api/market/v1/mock/mock_imarket.go index f3645ade1d..2621c5fea3 100644 --- a/venus-shared/api/market/v1/mock/mock_imarket.go +++ b/venus-shared/api/market/v1/mock/mock_imarket.go @@ -25,6 +25,7 @@ import ( uuid "github.com/google/uuid" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" + multihash "github.com/multiformats/go-multihash" ) // MockIMarket is a mock of IMarket interface. @@ -765,6 +766,95 @@ func (mr *MockIMarketMockRecorder) ImportDirectDeal(arg0, arg1 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportDirectDeal", reflect.TypeOf((*MockIMarket)(nil).ImportDirectDeal), arg0, arg1) } +// IndexerAnnounceAllDeals mocks base method. +func (m *MockIMarket) IndexerAnnounceAllDeals(arg0 context.Context, arg1 address.Address) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexerAnnounceAllDeals", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// IndexerAnnounceAllDeals indicates an expected call of IndexerAnnounceAllDeals. +func (mr *MockIMarketMockRecorder) IndexerAnnounceAllDeals(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexerAnnounceAllDeals", reflect.TypeOf((*MockIMarket)(nil).IndexerAnnounceAllDeals), arg0, arg1) +} + +// IndexerAnnounceDeal mocks base method. +func (m *MockIMarket) IndexerAnnounceDeal(arg0 context.Context, arg1 []byte) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexerAnnounceDeal", arg0, arg1) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexerAnnounceDeal indicates an expected call of IndexerAnnounceDeal. +func (mr *MockIMarketMockRecorder) IndexerAnnounceDeal(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexerAnnounceDeal", reflect.TypeOf((*MockIMarket)(nil).IndexerAnnounceDeal), arg0, arg1) +} + +// IndexerAnnounceDealRemoved mocks base method. +func (m *MockIMarket) IndexerAnnounceDealRemoved(arg0 context.Context, arg1 cid.Cid) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexerAnnounceDealRemoved", arg0, arg1) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexerAnnounceDealRemoved indicates an expected call of IndexerAnnounceDealRemoved. +func (mr *MockIMarketMockRecorder) IndexerAnnounceDealRemoved(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexerAnnounceDealRemoved", reflect.TypeOf((*MockIMarket)(nil).IndexerAnnounceDealRemoved), arg0, arg1) +} + +// IndexerAnnounceLatest mocks base method. +func (m *MockIMarket) IndexerAnnounceLatest(arg0 context.Context) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexerAnnounceLatest", arg0) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexerAnnounceLatest indicates an expected call of IndexerAnnounceLatest. +func (mr *MockIMarketMockRecorder) IndexerAnnounceLatest(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexerAnnounceLatest", reflect.TypeOf((*MockIMarket)(nil).IndexerAnnounceLatest), arg0) +} + +// IndexerAnnounceLatestHttp mocks base method. +func (m *MockIMarket) IndexerAnnounceLatestHttp(arg0 context.Context, arg1 []string) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexerAnnounceLatestHttp", arg0, arg1) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexerAnnounceLatestHttp indicates an expected call of IndexerAnnounceLatestHttp. +func (mr *MockIMarketMockRecorder) IndexerAnnounceLatestHttp(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexerAnnounceLatestHttp", reflect.TypeOf((*MockIMarket)(nil).IndexerAnnounceLatestHttp), arg0, arg1) +} + +// IndexerListMultihashes mocks base method. +func (m *MockIMarket) IndexerListMultihashes(arg0 context.Context, arg1 []byte) ([]multihash.Multihash, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexerListMultihashes", arg0, arg1) + ret0, _ := ret[0].([]multihash.Multihash) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexerListMultihashes indicates an expected call of IndexerListMultihashes. +func (mr *MockIMarketMockRecorder) IndexerListMultihashes(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexerListMultihashes", reflect.TypeOf((*MockIMarket)(nil).IndexerListMultihashes), arg0, arg1) +} + // ListDirectDeals mocks base method. func (m *MockIMarket) ListDirectDeals(arg0 context.Context, arg1 market.DirectDealQueryParams) ([]*market.DirectDeal, error) { m.ctrl.T.Helper() diff --git a/venus-shared/api/market/v1/proxy_gen.go b/venus-shared/api/market/v1/proxy_gen.go index 5b9d76b733..44fb919aaf 100644 --- a/venus-shared/api/market/v1/proxy_gen.go +++ b/venus-shared/api/market/v1/proxy_gen.go @@ -15,6 +15,7 @@ import ( "github.com/google/uuid" cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" + multihash "github.com/multiformats/go-multihash" "github.com/filecoin-project/venus/venus-shared/types" "github.com/filecoin-project/venus/venus-shared/types/gateway" @@ -72,6 +73,12 @@ type IMarketStruct struct { GetUnPackedDeals func(ctx context.Context, miner address.Address, spec *market.GetDealSpec) ([]*market.DealInfoIncludePath, error) `perm:"read"` ID func(context.Context) (peer.ID, error) `perm:"read"` ImportDirectDeal func(ctx context.Context, deal *market.DirectDealParams) error `perm:"write"` + IndexerAnnounceAllDeals func(ctx context.Context, minerAddr address.Address) error `perm:"admin"` + IndexerAnnounceDeal func(ctx context.Context, contextID []byte) (cid.Cid, error) `perm:"admin"` + IndexerAnnounceDealRemoved func(ctx context.Context, propCid cid.Cid) (cid.Cid, error) `perm:"admin"` + IndexerAnnounceLatest func(ctx context.Context) (cid.Cid, error) `perm:"admin"` + IndexerAnnounceLatestHttp func(ctx context.Context, urls []string) (cid.Cid, error) `perm:"admin"` + IndexerListMultihashes func(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) `perm:"read"` ListDirectDeals func(ctx context.Context, queryParams market.DirectDealQueryParams) ([]*market.DirectDeal, error) `perm:"read"` ListPieceStorageInfos func(ctx context.Context) market.PieceStorageInfos `perm:"read"` ListenMarketEvent func(ctx context.Context, policy *gateway.MarketRegisterPolicy) (<-chan *gateway.RequestEvent, error) `perm:"read"` @@ -276,6 +283,24 @@ func (s *IMarketStruct) ID(p0 context.Context) (peer.ID, error) { return s.Inter func (s *IMarketStruct) ImportDirectDeal(p0 context.Context, p1 *market.DirectDealParams) error { return s.Internal.ImportDirectDeal(p0, p1) } +func (s *IMarketStruct) IndexerAnnounceAllDeals(p0 context.Context, p1 address.Address) error { + return s.Internal.IndexerAnnounceAllDeals(p0, p1) +} +func (s *IMarketStruct) IndexerAnnounceDeal(p0 context.Context, p1 []byte) (cid.Cid, error) { + return s.Internal.IndexerAnnounceDeal(p0, p1) +} +func (s *IMarketStruct) IndexerAnnounceDealRemoved(p0 context.Context, p1 cid.Cid) (cid.Cid, error) { + return s.Internal.IndexerAnnounceDealRemoved(p0, p1) +} +func (s *IMarketStruct) IndexerAnnounceLatest(p0 context.Context) (cid.Cid, error) { + return s.Internal.IndexerAnnounceLatest(p0) +} +func (s *IMarketStruct) IndexerAnnounceLatestHttp(p0 context.Context, p1 []string) (cid.Cid, error) { + return s.Internal.IndexerAnnounceLatestHttp(p0, p1) +} +func (s *IMarketStruct) IndexerListMultihashes(p0 context.Context, p1 []byte) ([]multihash.Multihash, error) { + return s.Internal.IndexerListMultihashes(p0, p1) +} func (s *IMarketStruct) ListDirectDeals(p0 context.Context, p1 market.DirectDealQueryParams) ([]*market.DirectDeal, error) { return s.Internal.ListDirectDeals(p0, p1) } From 524bcbcc285e8fa0cf73e6cd9bcd127233246c61 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Fri, 19 Jul 2024 14:06:44 +0800 Subject: [PATCH 2/3] feat: pubsub indexer message --- app/node/builder.go | 2 +- app/submodule/mining/mining_submodule.go | 4 - app/submodule/network/network_submodule.go | 70 ++++++-- go.mod | 2 + go.sum | 7 +- pkg/net/pubsub/indexer_message.go | 198 +++++++++++++++++++++ pkg/net/pubsub/ratelimit/queue.go | 89 +++++++++ pkg/net/pubsub/ratelimit/queue_test.go | 65 +++++++ pkg/net/pubsub/ratelimit/window.go | 70 ++++++++ pkg/net/pubsub/ratelimit/window_test.go | 65 +++++++ 10 files changed, 545 insertions(+), 27 deletions(-) create mode 100644 pkg/net/pubsub/indexer_message.go create mode 100644 pkg/net/pubsub/ratelimit/queue.go create mode 100644 pkg/net/pubsub/ratelimit/queue_test.go create mode 100644 pkg/net/pubsub/ratelimit/window.go create mode 100644 pkg/net/pubsub/ratelimit/window_test.go diff --git a/app/node/builder.go b/app/node/builder.go index acc54f56fb..19874d5d5a 100644 --- a/app/node/builder.go +++ b/app/node/builder.go @@ -151,7 +151,7 @@ func (b *Builder) build(ctx context.Context) (*Node, error) { if err != nil { return nil, errors.Wrap(err, "failed to build node.storageNetworking") } - nd.mining = mining.NewMiningModule(nd.syncer.Stmgr, (*builder)(b), nd.chain, nd.blockstore, nd.network, nd.syncer, *nd.wallet) + nd.mining = mining.NewMiningModule(nd.syncer.Stmgr, (*builder)(b), nd.chain, nd.blockstore, nd.syncer, *nd.wallet) mgrps := &paychmgr.ManagerParams{ MPoolAPI: nd.mpool.API(), diff --git a/app/submodule/mining/mining_submodule.go b/app/submodule/mining/mining_submodule.go index 6a9b3e035f..738b4c766b 100644 --- a/app/submodule/mining/mining_submodule.go +++ b/app/submodule/mining/mining_submodule.go @@ -3,7 +3,6 @@ package mining import ( "github.com/filecoin-project/venus/app/submodule/blockstore" chain2 "github.com/filecoin-project/venus/app/submodule/chain" - "github.com/filecoin-project/venus/app/submodule/network" "github.com/filecoin-project/venus/app/submodule/syncer" "github.com/filecoin-project/venus/app/submodule/wallet" "github.com/filecoin-project/venus/pkg/repo" @@ -23,7 +22,6 @@ type MiningModule struct { //nolint Config miningConfig ChainModule *chain2.ChainSubmodule BlockStore *blockstore.BlockstoreSubmodule - NetworkModule *network.NetworkSubmodule SyncModule *syncer.SyncerSubmodule Wallet wallet.WalletSubmodule proofVerifier ffiwrapper.Verifier @@ -45,7 +43,6 @@ func NewMiningModule( conf miningConfig, chainModule *chain2.ChainSubmodule, blockStore *blockstore.BlockstoreSubmodule, - networkModule *network.NetworkSubmodule, syncModule *syncer.SyncerSubmodule, wallet wallet.WalletSubmodule, ) *MiningModule { @@ -54,7 +51,6 @@ func NewMiningModule( Config: conf, ChainModule: chainModule, BlockStore: blockStore, - NetworkModule: networkModule, SyncModule: syncModule, Wallet: wallet, proofVerifier: conf.Verifier(), diff --git a/app/submodule/network/network_submodule.go b/app/submodule/network/network_submodule.go index d36f99b258..ed4cfcc489 100644 --- a/app/submodule/network/network_submodule.go +++ b/app/submodule/network/network_submodule.go @@ -7,8 +7,6 @@ import ( "os" "time" - "github.com/filecoin-project/venus/pkg/net/helloprotocol" - "github.com/dchest/blake2b" "github.com/ipfs/boxo/bitswap" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -47,7 +45,9 @@ import ( "github.com/filecoin-project/venus/pkg/config" "github.com/filecoin-project/venus/pkg/net" filexchange "github.com/filecoin-project/venus/pkg/net/exchange" + "github.com/filecoin-project/venus/pkg/net/helloprotocol" "github.com/filecoin-project/venus/pkg/net/peermgr" + "github.com/filecoin-project/venus/pkg/net/pubsub" "github.com/filecoin-project/venus/pkg/repo" appstate "github.com/filecoin-project/venus/pkg/state" "github.com/filecoin-project/venus/venus-shared/types" @@ -88,6 +88,8 @@ type NetworkSubmodule struct { //nolint ScoreKeeper *net.ScoreKeeper + indexMsgRelayCancelFunc libp2pps.RelayCancelFunc + cfg networkConfig } @@ -112,6 +114,7 @@ func (networkSubmodule *NetworkSubmodule) Stop(ctx context.Context) { if err := networkSubmodule.Router.(*dht.IpfsDHT).Close(); err != nil { networkLogger.Errorf("error closing dht: %s", err.Error()) } + networkSubmodule.indexMsgRelayCancelFunc() } type networkConfig interface { @@ -183,6 +186,11 @@ func NewNetworkSubmodule(ctx context.Context, return nil, errors.Wrap(err, "failed to set up network") } + indexMsgRelayCancelFunc, err := relayIndexerMessages(gsub, networkName, peerHost, chainStore) + if err != nil { + return nil, fmt.Errorf("failed to set up relay indexer messages: %w", err) + } + // set up bitswap nwork := bsnet.NewFromIpfsHost(peerHost, router, bsnet.Prefix("/chain")) bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} @@ -215,22 +223,23 @@ func NewNetworkSubmodule(ctx context.Context, helloHandler := helloprotocol.NewHelloProtocolHandler(peerHost, peerMgr, exchangeClient, chainStore, messageStore, config.GenesisCid(), time.Duration(config.Repo().Config().NetworkParams.BlockDelay)*time.Second) // build the network submdule return &NetworkSubmodule{ - NetworkName: networkName, - Host: peerHost, - RawHost: rawHost, - Router: router, - Pubsub: gsub, - Bitswap: bswap, - GraphExchange: gsync, - ExchangeClient: exchangeClient, - exchangeServer: exchangeServer, - Network: network, - DataTransfer: dt, - DataTransferHost: dtNet, - PeerMgr: peerMgr, - HelloHandler: helloHandler, - cfg: config, - ScoreKeeper: sk, + NetworkName: networkName, + Host: peerHost, + RawHost: rawHost, + Router: router, + Pubsub: gsub, + Bitswap: bswap, + GraphExchange: gsync, + ExchangeClient: exchangeClient, + exchangeServer: exchangeServer, + Network: network, + DataTransfer: dt, + DataTransferHost: dtNet, + PeerMgr: peerMgr, + HelloHandler: helloHandler, + cfg: config, + ScoreKeeper: sk, + indexMsgRelayCancelFunc: indexMsgRelayCancelFunc, }, nil } @@ -341,7 +350,7 @@ func retrieveNetworkName(ctx context.Context, genCid cid.Cid, cborStore cbor.Ipl // address determines if we are publically dialable. If so use public // address, if not configure node to announce relay address. -func buildHost(ctx context.Context, config networkConfig, libP2pOpts []libp2p.Option, cfg *config.Config) (types.RawHost, error) { +func buildHost(_ context.Context, config networkConfig, libP2pOpts []libp2p.Option, cfg *config.Config) (types.RawHost, error) { if config.IsRelay() { publicAddr, err := ma.NewMultiaddr(cfg.Swarm.PublicRelayAddress) if err != nil { @@ -448,3 +457,26 @@ func connectionManager(low, high uint, grace time.Duration, protected []string, return cm, nil } + +func relayIndexerMessages(ps *libp2pps.PubSub, nn string, h host.Host, chainReader *chain.Store) (libp2pps.RelayCancelFunc, error) { + topicName := types.IndexerIngestTopic(nn) + + v := pubsub.NewIndexerMessageValidator(h.ID(), chainReader) + + if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { + return nil, fmt.Errorf("failed to register validator for topic %s, err: %w", topicName, err) + } + + topicHandle, err := ps.Join(topicName) + if err != nil { + return nil, fmt.Errorf("failed to join pubsub topic %s: %w", topicName, err) + } + cancelFunc, err := topicHandle.Relay() + if err != nil { + return nil, fmt.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err) + } + + networkLogger.Infof("relaying messages for pubsub topic %s", topicName) + + return cancelFunc, nil +} diff --git a/go.mod b/go.mod index 4a276c055f..b6127c3348 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/ipfs/go-unixfs v0.4.5 github.com/ipld/go-car v0.6.2 github.com/ipld/go-car/v2 v2.13.1 + github.com/ipni/go-libipni v0.0.7 github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-libp2p v0.35.0 github.com/libp2p/go-libp2p-kad-dht v0.25.2 @@ -145,6 +146,7 @@ require ( github.com/whyrusleeping/go-logging v0.0.1 // indirect go.uber.org/mock v0.4.0 // indirect gonum.org/v1/gonum v0.15.0 // indirect + google.golang.org/genproto v0.0.0-20230629202037-9506855d4529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect ) diff --git a/go.sum b/go.sum index 001d24e466..3898ca5f91 100644 --- a/go.sum +++ b/go.sum @@ -1269,8 +1269,9 @@ github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= -github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/assertions v1.13.0 h1:Dx1kYM01xsSqKPno3aqLnrwac2LetPvN23diwyr69Qs= +github.com/smartystreets/assertions v1.13.0/go.mod h1:wDmR7qL282YbGsPy6H/yAsesrxfxaaSlJazyFLYVFx8= github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= @@ -2064,8 +2065,8 @@ google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= -google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao= -google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= +google.golang.org/genproto v0.0.0-20230629202037-9506855d4529 h1:9JucMWR7sPvCxUFd6UsOUNmA5kCcWOfORaT3tpAsKQs= +google.golang.org/genproto v0.0.0-20230629202037-9506855d4529/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291 h1:4HZJ3Xv1cmrJ+0aFo304Zn79ur1HMxptAE7aCPNLSqc= google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:RGnPtTG7r4i8sPlNyDeikXF99hMM+hN6QMm4ooG9g2g= google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 h1:AgADTJarZTBqgjiUzRgfaBchgYB3/WFTC80GPwsMcRI= diff --git a/pkg/net/pubsub/indexer_message.go b/pkg/net/pubsub/indexer_message.go new file mode 100644 index 0000000000..a547b303aa --- /dev/null +++ b/pkg/net/pubsub/indexer_message.go @@ -0,0 +1,198 @@ +package pubsub + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "sync" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/venus/pkg/chain" + "github.com/filecoin-project/venus/pkg/net/pubsub/ratelimit" + "github.com/filecoin-project/venus/pkg/state" + "github.com/filecoin-project/venus/venus-shared/actors/builtin/miner" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + message2 "github.com/ipni/go-libipni/announce/message" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" +) + +var log = logging.Logger("pubsub-index-message") + +type peerMsgInfo struct { + peerID peer.ID + lastCid cid.Cid + lastSeqno uint64 + rateLimit *ratelimit.Window + mutex sync.Mutex +} + +type IndexerMessageValidator struct { + self peer.ID + + peerCache *lru.TwoQueueCache[address.Address, *peerMsgInfo] + chainStore *chain.Store +} + +func NewIndexerMessageValidator(self peer.ID, chainStore *chain.Store) *IndexerMessageValidator { + peerCache, _ := lru.New2Q[address.Address, *peerMsgInfo](8192) + + return &IndexerMessageValidator{ + self: self, + peerCache: peerCache, + chainStore: chainStore, + } +} + +func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + // This chain-node should not be publishing its own messages. These are + // relayed from market-nodes. If a node appears to be local, reject it. + if pid == v.self { + log.Debug("ignoring indexer message from self") + return pubsub.ValidationIgnore + } + originPeer := msg.GetFrom() + if originPeer == v.self { + log.Debug("ignoring indexer message originating from self") + return pubsub.ValidationIgnore + } + + idxrMsg := message2.Message{} + err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data)) + if err != nil { + log.Errorw("Could not decode indexer pubsub message", "err", err) + return pubsub.ValidationReject + } + if len(idxrMsg.ExtraData) == 0 { + log.Debugw("ignoring message missing miner id", "peer", originPeer) + return pubsub.ValidationIgnore + } + + // Get miner info from lotus + minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData) + if err != nil { + log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData) + return pubsub.ValidationReject + } + + msgCid := idxrMsg.Cid + + msgInfo, cached := v.peerCache.Get(minerAddr) + if !cached { + msgInfo = &peerMsgInfo{} + } + + // Lock this peer's message info. + msgInfo.mutex.Lock() + defer msgInfo.mutex.Unlock() + + var seqno uint64 + if cached { + // Reject replayed messages. + seqno = binary.BigEndian.Uint64(msg.Message.GetSeqno()) + if seqno <= msgInfo.lastSeqno { + log.Debugf("ignoring replayed indexer message") + return pubsub.ValidationIgnore + } + } + + if !cached || originPeer != msgInfo.peerID { + // Check that the miner ID maps to the peer that sent the message. + err = v.authenticateMessage(ctx, minerAddr, originPeer) + if err != nil { + log.Warnw("cannot authenticate message", "err", err, "peer", originPeer, "minerID", minerAddr) + + return pubsub.ValidationReject + } + msgInfo.peerID = originPeer + if !cached { + // Add msgInfo to cache only after being authenticated. If two + // messages from the same peer are handled concurrently, there is a + // small chance that one msgInfo could replace the other here when + // the info is first cached. This is OK, so no need to prevent it. + v.peerCache.Add(minerAddr, msgInfo) + } + } + + // Update message info cache with the latest message's sequence number. + msgInfo.lastSeqno = seqno + + // See if message needs to be ignored due to rate limiting. + if v.rateLimitPeer(msgInfo, msgCid) { + return pubsub.ValidationIgnore + } + + return pubsub.ValidationAccept +} + +func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid.Cid) bool { + const ( + msgLimit = 5 + msgTimeLimit = 10 * time.Second + repeatTimeLimit = 2 * time.Hour + ) + + timeWindow := msgInfo.rateLimit + + // Check overall message rate. + if timeWindow == nil { + timeWindow = ratelimit.NewWindow(msgLimit, msgTimeLimit) + msgInfo.rateLimit = timeWindow + } else if msgInfo.lastCid == msgCid { + // Check if this is a repeat of the previous message data. + if time.Since(timeWindow.Newest()) < repeatTimeLimit { + log.Warnw("ignoring repeated indexer message", "sender", msgInfo.peerID) + return true + } + } + + err := timeWindow.Add() + if err != nil { + log.Warnw("ignoring indexer message", "sender", msgInfo.peerID, "err", err) + return true + } + + msgInfo.lastCid = msgCid + + return false +} + +func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerAddress address.Address, peerID peer.ID) error { + minerPeerID, err := getMinerPeer(ctx, minerAddress, v.chainStore) + if err != nil { + return err + } + + if minerPeerID != peerID { + return fmt.Errorf("miner id does not map to peer that sent message") + } + + return nil +} + +func getMinerPeer(ctx context.Context, maddr address.Address, chainStore *chain.Store) (peer.ID, error) { + view := state.NewView(chainStore.Store(ctx), chainStore.GetHead().ParentState()) + act, err := view.LoadActor(ctx, maddr) + if err != nil { + return peer.ID(""), fmt.Errorf("failed to load miner actor: %v", err) + } + mas, err := miner.Load(chainStore.Store(ctx), act) + if err != nil { + return peer.ID(""), fmt.Errorf("failed to load miner actor state: %v", err) + } + info, err := mas.Info() + if err != nil { + return peer.ID(""), fmt.Errorf("failed to load miner info: %v", err) + } + + peerID, err := peer.IDFromBytes(info.PeerId) + if err != nil { + return peer.ID(""), fmt.Errorf("miner not set peer id") + } + + return peerID, nil +} diff --git a/pkg/net/pubsub/ratelimit/queue.go b/pkg/net/pubsub/ratelimit/queue.go new file mode 100644 index 0000000000..49f9bef668 --- /dev/null +++ b/pkg/net/pubsub/ratelimit/queue.go @@ -0,0 +1,89 @@ +package ratelimit + +import "errors" + +var ErrRateLimitExceeded = errors.New("rate limit exceeded") + +type queue struct { + buf []int64 + count int + head int + tail int +} + +// cap returns the queue capacity +func (q *queue) cap() int { + return len(q.buf) +} + +// len returns the number of items in the queue +func (q *queue) len() int { + return q.count +} + +// push adds an element to the end of the queue. +func (q *queue) push(elem int64) error { + if q.count == len(q.buf) { + return ErrRateLimitExceeded + } + + q.buf[q.tail] = elem + // Calculate new tail position. + q.tail = q.next(q.tail) + q.count++ + return nil +} + +// pop removes and returns the element from the front of the queue. +func (q *queue) pop() int64 { + if q.count <= 0 { + panic("pop from empty queue") + } + ret := q.buf[q.head] + + // Calculate new head position. + q.head = q.next(q.head) + q.count-- + + return ret +} + +// front returns the element at the front of the queue. This is the element +// that would be returned by pop(). This call panics if the queue is empty. +func (q *queue) front() int64 { + if q.count <= 0 { + panic("front() called when empty") + } + return q.buf[q.head] +} + +// back returns the element at the back of the queue. This call panics if the +// queue is empty. +func (q *queue) back() int64 { + if q.count <= 0 { + panic("back() called when empty") + } + return q.buf[q.prev(q.tail)] +} + +// prev returns the previous buffer position wrapping around buffer. +func (q *queue) prev(i int) int { + if i == 0 { + return len(q.buf) - 1 + } + return (i - 1) % len(q.buf) +} + +// next returns the next buffer position wrapping around buffer. +func (q *queue) next(i int) int { + return (i + 1) % len(q.buf) +} + +// truncate pops values that are less than or equal the specified threshold. +func (q *queue) truncate(threshold int64) { + for q.count != 0 && q.buf[q.head] <= threshold { + // pop() without returning a value + q.head = q.next(q.head) + q.count-- + } +} diff --git a/pkg/net/pubsub/ratelimit/queue_test.go b/pkg/net/pubsub/ratelimit/queue_test.go new file mode 100644 index 0000000000..5ba0f5a23a --- /dev/null +++ b/pkg/net/pubsub/ratelimit/queue_test.go @@ -0,0 +1,65 @@ +package ratelimit + +import ( + "testing" + + tf "github.com/filecoin-project/venus/pkg/testhelpers/testflags" +) + +func TestQueue(t *testing.T) { + tf.UnitTest(t) + + const size = 3 + q := &queue{buf: make([]int64, size)} + + if q.len() != 0 { + t.Fatalf("q.len() = %d, expect 0", q.len()) + } + + if q.cap() != size { + t.Fatalf("q.cap() = %d, expect %d", q.cap(), size) + } + + for i := int64(0); i < int64(size); i++ { + err := q.push(i) + if err != nil { + t.Fatalf("cannot push element %d", i) + } + } + + if q.len() != size { + t.Fatalf("q.len() = %d, expect %d", q.len(), size) + } + + err := q.push(int64(size)) + if err != ErrRateLimitExceeded { + t.Fatalf("pushing element beyond capacity should have failed with err: %s, got %s", ErrRateLimitExceeded, err) + } + + if q.front() != 0 { + t.Fatalf("q.front() = %d, expect 0", q.front()) + } + + if q.back() != int64(size-1) { + t.Fatalf("q.back() = %d, expect %d", q.back(), size-1) + } + + popVal := q.pop() + if popVal != 0 { + t.Fatalf("q.pop() = %d, expect 0", popVal) + } + + if q.len() != size-1 { + t.Fatalf("q.len() = %d, expect %d", q.len(), size-1) + } + + // Testing truncation. + threshold := int64(1) + q.truncate(threshold) + if q.len() != 1 { + t.Fatalf("q.len() after truncate = %d, expect 1", q.len()) + } + if q.front() != 2 { + t.Fatalf("q.front() after truncate = %d, expect 2", q.front()) + } +} diff --git a/pkg/net/pubsub/ratelimit/window.go b/pkg/net/pubsub/ratelimit/window.go new file mode 100644 index 0000000000..8350b109eb --- /dev/null +++ b/pkg/net/pubsub/ratelimit/window.go @@ -0,0 +1,70 @@ +package ratelimit + +import "time" + +// Window is a time windows for counting events within a span of time. The +// windows slides forward in time so that it spans from the most recent event +// to size time in the past. +type Window struct { + q *queue + size int64 +} + +// NewWindow creates a new Window that limits the number of events to maximum +// count of events within a duration of time. The capacity sets the maximum +// number of events, and size sets the span of time over which the events are +// counted. +func NewWindow(capacity int, size time.Duration) *Window { + return &Window{ + q: &queue{ + buf: make([]int64, capacity), + }, + size: int64(size), + } +} + +// Add attempts to append a new timestamp into the current window. Previously +// added values that are not within `size` difference from the value being +// added are first removed. Add fails if adding the value would cause the +// window to exceed capacity. +func (w *Window) Add() error { + now := time.Now().UnixNano() + if w.Len() != 0 { + w.q.truncate(now - w.size) + } + return w.q.push(now) +} + +// Cap returns the maximum number of items the window can hold. +func (w *Window) Cap() int { + return w.q.cap() +} + +// Len returns the number of elements currently in the window. +func (w *Window) Len() int { + return w.q.len() +} + +// Span returns the distance from the first to the last item in the window. +func (w *Window) Span() time.Duration { + if w.q.len() < 2 { + return 0 + } + return time.Duration(w.q.back() - w.q.front()) +} + +// Oldest returns the oldest timestamp in the window. +func (w *Window) Oldest() time.Time { + if w.q.len() == 0 { + return time.Time{} + } + return time.Unix(0, w.q.front()) +} + +// Newest returns the newest timestamp in the window. +func (w *Window) Newest() time.Time { + if w.q.len() == 0 { + return time.Time{} + } + return time.Unix(0, w.q.back()) +} diff --git a/pkg/net/pubsub/ratelimit/window_test.go b/pkg/net/pubsub/ratelimit/window_test.go new file mode 100644 index 0000000000..02982c3852 --- /dev/null +++ b/pkg/net/pubsub/ratelimit/window_test.go @@ -0,0 +1,65 @@ +package ratelimit + +import ( + "testing" + "time" + + tf "github.com/filecoin-project/venus/pkg/testhelpers/testflags" +) + +func TestWindow(t *testing.T) { + tf.UnitTest(t) + + const ( + maxEvents = 3 + timeLimit = 100 * time.Millisecond + ) + w := NewWindow(maxEvents, timeLimit) + if w.Len() != 0 { + t.Fatal("q.Len() =", w.Len(), "expect 0") + } + if w.Cap() != maxEvents { + t.Fatal("q.Cap() =", w.Cap(), "expect 3") + } + if !w.Newest().IsZero() { + t.Fatal("expected newest to be zero time with empty window") + } + if !w.Oldest().IsZero() { + t.Fatal("expected oldest to be zero time with empty window") + } + if w.Span() != 0 { + t.Fatal("expected span to be zero time with empty window") + } + + var err error + for i := 0; i < maxEvents; i++ { + err = w.Add() + if err != nil { + t.Fatalf("cannot add event %d", i) + } + } + if w.Len() != maxEvents { + t.Fatalf("q.Len() is %d, expected %d", w.Len(), maxEvents) + } + if err = w.Add(); err != ErrRateLimitExceeded { + t.Fatalf("add event %d within time limit should have failed with err: %s", maxEvents+1, ErrRateLimitExceeded) + } + + time.Sleep(timeLimit) + if err = w.Add(); err != nil { + t.Fatalf("cannot add event after time limit: %s", err) + } + + prev := w.Newest() + time.Sleep(timeLimit) + err = w.Add() + if err != nil { + t.Fatalf("cannot add event") + } + if w.Newest().Before(prev) { + t.Fatal("newest is before previous value") + } + if w.Oldest().Before(prev) { + t.Fatal("oldest is before previous value") + } +} From 09b07610715b6dba70dc7c44e0742700088959ed Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 22 Jul 2024 13:13:09 +0800 Subject: [PATCH 3/3] fix: update IndexerAnnounceDealRemoved --- venus-shared/api/market/v1/api.go | 2 +- venus-shared/api/market/v1/method.md | 4 +--- venus-shared/api/market/v1/mock/mock_imarket.go | 2 +- venus-shared/api/market/v1/proxy_gen.go | 4 ++-- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/venus-shared/api/market/v1/api.go b/venus-shared/api/market/v1/api.go index 6442bedd5d..69e806d358 100644 --- a/venus-shared/api/market/v1/api.go +++ b/venus-shared/api/market/v1/api.go @@ -205,7 +205,7 @@ type IMarket interface { IndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) //perm:read IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) //perm:admin IndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) //perm:admin - IndexerAnnounceDealRemoved(ctx context.Context, propCid cid.Cid) (cid.Cid, error) //perm:admin + IndexerAnnounceDealRemoved(ctx context.Context, contextID []byte) (cid.Cid, error) //perm:admin IndexerAnnounceDeal(ctx context.Context, contextID []byte) (cid.Cid, error) //perm:admin api.Version diff --git a/venus-shared/api/market/v1/method.md b/venus-shared/api/market/v1/method.md index 77d18c6fe8..1d378cec22 100644 --- a/venus-shared/api/market/v1/method.md +++ b/venus-shared/api/market/v1/method.md @@ -1281,9 +1281,7 @@ Perms: admin Inputs: ```json [ - { - "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" - } + "Ynl0ZSBhcnJheQ==" ] ``` diff --git a/venus-shared/api/market/v1/mock/mock_imarket.go b/venus-shared/api/market/v1/mock/mock_imarket.go index 2621c5fea3..a42966055d 100644 --- a/venus-shared/api/market/v1/mock/mock_imarket.go +++ b/venus-shared/api/market/v1/mock/mock_imarket.go @@ -796,7 +796,7 @@ func (mr *MockIMarketMockRecorder) IndexerAnnounceDeal(arg0, arg1 interface{}) * } // IndexerAnnounceDealRemoved mocks base method. -func (m *MockIMarket) IndexerAnnounceDealRemoved(arg0 context.Context, arg1 cid.Cid) (cid.Cid, error) { +func (m *MockIMarket) IndexerAnnounceDealRemoved(arg0 context.Context, arg1 []byte) (cid.Cid, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IndexerAnnounceDealRemoved", arg0, arg1) ret0, _ := ret[0].(cid.Cid) diff --git a/venus-shared/api/market/v1/proxy_gen.go b/venus-shared/api/market/v1/proxy_gen.go index 44fb919aaf..979015202c 100644 --- a/venus-shared/api/market/v1/proxy_gen.go +++ b/venus-shared/api/market/v1/proxy_gen.go @@ -75,7 +75,7 @@ type IMarketStruct struct { ImportDirectDeal func(ctx context.Context, deal *market.DirectDealParams) error `perm:"write"` IndexerAnnounceAllDeals func(ctx context.Context, minerAddr address.Address) error `perm:"admin"` IndexerAnnounceDeal func(ctx context.Context, contextID []byte) (cid.Cid, error) `perm:"admin"` - IndexerAnnounceDealRemoved func(ctx context.Context, propCid cid.Cid) (cid.Cid, error) `perm:"admin"` + IndexerAnnounceDealRemoved func(ctx context.Context, contextID []byte) (cid.Cid, error) `perm:"admin"` IndexerAnnounceLatest func(ctx context.Context) (cid.Cid, error) `perm:"admin"` IndexerAnnounceLatestHttp func(ctx context.Context, urls []string) (cid.Cid, error) `perm:"admin"` IndexerListMultihashes func(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) `perm:"read"` @@ -289,7 +289,7 @@ func (s *IMarketStruct) IndexerAnnounceAllDeals(p0 context.Context, p1 address.A func (s *IMarketStruct) IndexerAnnounceDeal(p0 context.Context, p1 []byte) (cid.Cid, error) { return s.Internal.IndexerAnnounceDeal(p0, p1) } -func (s *IMarketStruct) IndexerAnnounceDealRemoved(p0 context.Context, p1 cid.Cid) (cid.Cid, error) { +func (s *IMarketStruct) IndexerAnnounceDealRemoved(p0 context.Context, p1 []byte) (cid.Cid, error) { return s.Internal.IndexerAnnounceDealRemoved(p0, p1) } func (s *IMarketStruct) IndexerAnnounceLatest(p0 context.Context) (cid.Cid, error) {