Skip to content

Commit

Permalink
Merge pull request #6382 from filecoin-project/feat/market/index-prov…
Browse files Browse the repository at this point in the history
…ider

feat(market): add index provider api
  • Loading branch information
0x5459 authored Jul 25, 2024
2 parents da0560e + 09b0761 commit 87dcb28
Show file tree
Hide file tree
Showing 14 changed files with 780 additions and 27 deletions.
2 changes: 1 addition & 1 deletion app/node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (b *Builder) build(ctx context.Context) (*Node, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to build node.storageNetworking")
}
nd.mining = mining.NewMiningModule(nd.syncer.Stmgr, (*builder)(b), nd.chain, nd.blockstore, nd.network, nd.syncer, *nd.wallet)
nd.mining = mining.NewMiningModule(nd.syncer.Stmgr, (*builder)(b), nd.chain, nd.blockstore, nd.syncer, *nd.wallet)

mgrps := &paychmgr.ManagerParams{
MPoolAPI: nd.mpool.API(),
Expand Down
4 changes: 0 additions & 4 deletions app/submodule/mining/mining_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mining
import (
"github.com/filecoin-project/venus/app/submodule/blockstore"
chain2 "github.com/filecoin-project/venus/app/submodule/chain"
"github.com/filecoin-project/venus/app/submodule/network"
"github.com/filecoin-project/venus/app/submodule/syncer"
"github.com/filecoin-project/venus/app/submodule/wallet"
"github.com/filecoin-project/venus/pkg/repo"
Expand All @@ -23,7 +22,6 @@ type MiningModule struct { //nolint
Config miningConfig
ChainModule *chain2.ChainSubmodule
BlockStore *blockstore.BlockstoreSubmodule
NetworkModule *network.NetworkSubmodule
SyncModule *syncer.SyncerSubmodule
Wallet wallet.WalletSubmodule
proofVerifier ffiwrapper.Verifier
Expand All @@ -45,7 +43,6 @@ func NewMiningModule(
conf miningConfig,
chainModule *chain2.ChainSubmodule,
blockStore *blockstore.BlockstoreSubmodule,
networkModule *network.NetworkSubmodule,
syncModule *syncer.SyncerSubmodule,
wallet wallet.WalletSubmodule,
) *MiningModule {
Expand All @@ -54,7 +51,6 @@ func NewMiningModule(
Config: conf,
ChainModule: chainModule,
BlockStore: blockStore,
NetworkModule: networkModule,
SyncModule: syncModule,
Wallet: wallet,
proofVerifier: conf.Verifier(),
Expand Down
70 changes: 51 additions & 19 deletions app/submodule/network/network_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"time"

"github.com/filecoin-project/venus/pkg/net/helloprotocol"

"github.com/dchest/blake2b"
"github.com/ipfs/boxo/bitswap"
bsnet "github.com/ipfs/boxo/bitswap/network"
Expand Down Expand Up @@ -47,7 +45,9 @@ import (
"github.com/filecoin-project/venus/pkg/config"
"github.com/filecoin-project/venus/pkg/net"
filexchange "github.com/filecoin-project/venus/pkg/net/exchange"
"github.com/filecoin-project/venus/pkg/net/helloprotocol"
"github.com/filecoin-project/venus/pkg/net/peermgr"
"github.com/filecoin-project/venus/pkg/net/pubsub"
"github.com/filecoin-project/venus/pkg/repo"
appstate "github.com/filecoin-project/venus/pkg/state"
"github.com/filecoin-project/venus/venus-shared/types"
Expand Down Expand Up @@ -88,6 +88,8 @@ type NetworkSubmodule struct { //nolint

ScoreKeeper *net.ScoreKeeper

indexMsgRelayCancelFunc libp2pps.RelayCancelFunc

cfg networkConfig
}

Expand All @@ -112,6 +114,7 @@ func (networkSubmodule *NetworkSubmodule) Stop(ctx context.Context) {
if err := networkSubmodule.Router.(*dht.IpfsDHT).Close(); err != nil {
networkLogger.Errorf("error closing dht: %s", err.Error())
}
networkSubmodule.indexMsgRelayCancelFunc()
}

type networkConfig interface {
Expand Down Expand Up @@ -183,6 +186,11 @@ func NewNetworkSubmodule(ctx context.Context,
return nil, errors.Wrap(err, "failed to set up network")
}

indexMsgRelayCancelFunc, err := relayIndexerMessages(gsub, networkName, peerHost, chainStore)
if err != nil {
return nil, fmt.Errorf("failed to set up relay indexer messages: %w", err)
}

// set up bitswap
nwork := bsnet.NewFromIpfsHost(peerHost, router, bsnet.Prefix("/chain"))
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
Expand Down Expand Up @@ -215,22 +223,23 @@ func NewNetworkSubmodule(ctx context.Context,
helloHandler := helloprotocol.NewHelloProtocolHandler(peerHost, peerMgr, exchangeClient, chainStore, messageStore, config.GenesisCid(), time.Duration(config.Repo().Config().NetworkParams.BlockDelay)*time.Second)
// build the network submdule
return &NetworkSubmodule{
NetworkName: networkName,
Host: peerHost,
RawHost: rawHost,
Router: router,
Pubsub: gsub,
Bitswap: bswap,
GraphExchange: gsync,
ExchangeClient: exchangeClient,
exchangeServer: exchangeServer,
Network: network,
DataTransfer: dt,
DataTransferHost: dtNet,
PeerMgr: peerMgr,
HelloHandler: helloHandler,
cfg: config,
ScoreKeeper: sk,
NetworkName: networkName,
Host: peerHost,
RawHost: rawHost,
Router: router,
Pubsub: gsub,
Bitswap: bswap,
GraphExchange: gsync,
ExchangeClient: exchangeClient,
exchangeServer: exchangeServer,
Network: network,
DataTransfer: dt,
DataTransferHost: dtNet,
PeerMgr: peerMgr,
HelloHandler: helloHandler,
cfg: config,
ScoreKeeper: sk,
indexMsgRelayCancelFunc: indexMsgRelayCancelFunc,
}, nil
}

Expand Down Expand Up @@ -341,7 +350,7 @@ func retrieveNetworkName(ctx context.Context, genCid cid.Cid, cborStore cbor.Ipl

// address determines if we are publically dialable. If so use public
// address, if not configure node to announce relay address.
func buildHost(ctx context.Context, config networkConfig, libP2pOpts []libp2p.Option, cfg *config.Config) (types.RawHost, error) {
func buildHost(_ context.Context, config networkConfig, libP2pOpts []libp2p.Option, cfg *config.Config) (types.RawHost, error) {
if config.IsRelay() {
publicAddr, err := ma.NewMultiaddr(cfg.Swarm.PublicRelayAddress)
if err != nil {
Expand Down Expand Up @@ -448,3 +457,26 @@ func connectionManager(low, high uint, grace time.Duration, protected []string,

return cm, nil
}

func relayIndexerMessages(ps *libp2pps.PubSub, nn string, h host.Host, chainReader *chain.Store) (libp2pps.RelayCancelFunc, error) {
topicName := types.IndexerIngestTopic(nn)

v := pubsub.NewIndexerMessageValidator(h.ID(), chainReader)

if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
return nil, fmt.Errorf("failed to register validator for topic %s, err: %w", topicName, err)
}

topicHandle, err := ps.Join(topicName)
if err != nil {
return nil, fmt.Errorf("failed to join pubsub topic %s: %w", topicName, err)
}
cancelFunc, err := topicHandle.Relay()
if err != nil {
return nil, fmt.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
}

networkLogger.Infof("relaying messages for pubsub topic %s", topicName)

return cancelFunc, nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/ipfs/go-unixfs v0.4.5
github.com/ipld/go-car v0.6.2
github.com/ipld/go-car/v2 v2.13.1
github.com/ipni/go-libipni v0.0.7
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p v0.35.0
github.com/libp2p/go-libp2p-kad-dht v0.25.2
Expand Down Expand Up @@ -145,6 +146,7 @@ require (
github.com/whyrusleeping/go-logging v0.0.1 // indirect
go.uber.org/mock v0.4.0 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/genproto v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
)

Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1269,8 +1269,9 @@ github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/assertions v1.13.0 h1:Dx1kYM01xsSqKPno3aqLnrwac2LetPvN23diwyr69Qs=
github.com/smartystreets/assertions v1.13.0/go.mod h1:wDmR7qL282YbGsPy6H/yAsesrxfxaaSlJazyFLYVFx8=
github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
Expand Down Expand Up @@ -2064,8 +2065,8 @@ google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64=
google.golang.org/genproto v0.0.0-20230629202037-9506855d4529 h1:9JucMWR7sPvCxUFd6UsOUNmA5kCcWOfORaT3tpAsKQs=
google.golang.org/genproto v0.0.0-20230629202037-9506855d4529/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64=
google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291 h1:4HZJ3Xv1cmrJ+0aFo304Zn79ur1HMxptAE7aCPNLSqc=
google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:RGnPtTG7r4i8sPlNyDeikXF99hMM+hN6QMm4ooG9g2g=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 h1:AgADTJarZTBqgjiUzRgfaBchgYB3/WFTC80GPwsMcRI=
Expand Down
Loading

0 comments on commit 87dcb28

Please sign in to comment.