Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(market): add index provider api #6382

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (b *Builder) build(ctx context.Context) (*Node, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to build node.storageNetworking")
}
nd.mining = mining.NewMiningModule(nd.syncer.Stmgr, (*builder)(b), nd.chain, nd.blockstore, nd.network, nd.syncer, *nd.wallet)
nd.mining = mining.NewMiningModule(nd.syncer.Stmgr, (*builder)(b), nd.chain, nd.blockstore, nd.syncer, *nd.wallet)

mgrps := &paychmgr.ManagerParams{
MPoolAPI: nd.mpool.API(),
Expand Down
4 changes: 0 additions & 4 deletions app/submodule/mining/mining_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mining
import (
"github.com/filecoin-project/venus/app/submodule/blockstore"
chain2 "github.com/filecoin-project/venus/app/submodule/chain"
"github.com/filecoin-project/venus/app/submodule/network"
"github.com/filecoin-project/venus/app/submodule/syncer"
"github.com/filecoin-project/venus/app/submodule/wallet"
"github.com/filecoin-project/venus/pkg/repo"
Expand All @@ -23,7 +22,6 @@ type MiningModule struct { //nolint
Config miningConfig
ChainModule *chain2.ChainSubmodule
BlockStore *blockstore.BlockstoreSubmodule
NetworkModule *network.NetworkSubmodule
SyncModule *syncer.SyncerSubmodule
Wallet wallet.WalletSubmodule
proofVerifier ffiwrapper.Verifier
Expand All @@ -45,7 +43,6 @@ func NewMiningModule(
conf miningConfig,
chainModule *chain2.ChainSubmodule,
blockStore *blockstore.BlockstoreSubmodule,
networkModule *network.NetworkSubmodule,
syncModule *syncer.SyncerSubmodule,
wallet wallet.WalletSubmodule,
) *MiningModule {
Expand All @@ -54,7 +51,6 @@ func NewMiningModule(
Config: conf,
ChainModule: chainModule,
BlockStore: blockStore,
NetworkModule: networkModule,
SyncModule: syncModule,
Wallet: wallet,
proofVerifier: conf.Verifier(),
Expand Down
70 changes: 51 additions & 19 deletions app/submodule/network/network_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"time"

"github.com/filecoin-project/venus/pkg/net/helloprotocol"

"github.com/dchest/blake2b"
"github.com/ipfs/boxo/bitswap"
bsnet "github.com/ipfs/boxo/bitswap/network"
Expand Down Expand Up @@ -47,7 +45,9 @@ import (
"github.com/filecoin-project/venus/pkg/config"
"github.com/filecoin-project/venus/pkg/net"
filexchange "github.com/filecoin-project/venus/pkg/net/exchange"
"github.com/filecoin-project/venus/pkg/net/helloprotocol"
"github.com/filecoin-project/venus/pkg/net/peermgr"
"github.com/filecoin-project/venus/pkg/net/pubsub"
"github.com/filecoin-project/venus/pkg/repo"
appstate "github.com/filecoin-project/venus/pkg/state"
"github.com/filecoin-project/venus/venus-shared/types"
Expand Down Expand Up @@ -88,6 +88,8 @@ type NetworkSubmodule struct { //nolint

ScoreKeeper *net.ScoreKeeper

indexMsgRelayCancelFunc libp2pps.RelayCancelFunc

cfg networkConfig
}

Expand All @@ -112,6 +114,7 @@ func (networkSubmodule *NetworkSubmodule) Stop(ctx context.Context) {
if err := networkSubmodule.Router.(*dht.IpfsDHT).Close(); err != nil {
networkLogger.Errorf("error closing dht: %s", err.Error())
}
networkSubmodule.indexMsgRelayCancelFunc()
}

type networkConfig interface {
Expand Down Expand Up @@ -183,6 +186,11 @@ func NewNetworkSubmodule(ctx context.Context,
return nil, errors.Wrap(err, "failed to set up network")
}

indexMsgRelayCancelFunc, err := relayIndexerMessages(gsub, networkName, peerHost, chainStore)
if err != nil {
return nil, fmt.Errorf("failed to set up relay indexer messages: %w", err)
}

// set up bitswap
nwork := bsnet.NewFromIpfsHost(peerHost, router, bsnet.Prefix("/chain"))
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
Expand Down Expand Up @@ -215,22 +223,23 @@ func NewNetworkSubmodule(ctx context.Context,
helloHandler := helloprotocol.NewHelloProtocolHandler(peerHost, peerMgr, exchangeClient, chainStore, messageStore, config.GenesisCid(), time.Duration(config.Repo().Config().NetworkParams.BlockDelay)*time.Second)
// build the network submdule
return &NetworkSubmodule{
NetworkName: networkName,
Host: peerHost,
RawHost: rawHost,
Router: router,
Pubsub: gsub,
Bitswap: bswap,
GraphExchange: gsync,
ExchangeClient: exchangeClient,
exchangeServer: exchangeServer,
Network: network,
DataTransfer: dt,
DataTransferHost: dtNet,
PeerMgr: peerMgr,
HelloHandler: helloHandler,
cfg: config,
ScoreKeeper: sk,
NetworkName: networkName,
Host: peerHost,
RawHost: rawHost,
Router: router,
Pubsub: gsub,
Bitswap: bswap,
GraphExchange: gsync,
ExchangeClient: exchangeClient,
exchangeServer: exchangeServer,
Network: network,
DataTransfer: dt,
DataTransferHost: dtNet,
PeerMgr: peerMgr,
HelloHandler: helloHandler,
cfg: config,
ScoreKeeper: sk,
indexMsgRelayCancelFunc: indexMsgRelayCancelFunc,
}, nil
}

Expand Down Expand Up @@ -341,7 +350,7 @@ func retrieveNetworkName(ctx context.Context, genCid cid.Cid, cborStore cbor.Ipl

// address determines if we are publically dialable. If so use public
// address, if not configure node to announce relay address.
func buildHost(ctx context.Context, config networkConfig, libP2pOpts []libp2p.Option, cfg *config.Config) (types.RawHost, error) {
func buildHost(_ context.Context, config networkConfig, libP2pOpts []libp2p.Option, cfg *config.Config) (types.RawHost, error) {
if config.IsRelay() {
publicAddr, err := ma.NewMultiaddr(cfg.Swarm.PublicRelayAddress)
if err != nil {
Expand Down Expand Up @@ -448,3 +457,26 @@ func connectionManager(low, high uint, grace time.Duration, protected []string,

return cm, nil
}

func relayIndexerMessages(ps *libp2pps.PubSub, nn string, h host.Host, chainReader *chain.Store) (libp2pps.RelayCancelFunc, error) {
topicName := types.IndexerIngestTopic(nn)

v := pubsub.NewIndexerMessageValidator(h.ID(), chainReader)

if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
return nil, fmt.Errorf("failed to register validator for topic %s, err: %w", topicName, err)
}

topicHandle, err := ps.Join(topicName)
if err != nil {
return nil, fmt.Errorf("failed to join pubsub topic %s: %w", topicName, err)
}
cancelFunc, err := topicHandle.Relay()
if err != nil {
return nil, fmt.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
}

networkLogger.Infof("relaying messages for pubsub topic %s", topicName)

return cancelFunc, nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/ipfs/go-unixfs v0.4.5
github.com/ipld/go-car v0.6.2
github.com/ipld/go-car/v2 v2.13.1
github.com/ipni/go-libipni v0.0.7
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p v0.35.0
github.com/libp2p/go-libp2p-kad-dht v0.25.2
Expand Down Expand Up @@ -145,6 +146,7 @@ require (
github.com/whyrusleeping/go-logging v0.0.1 // indirect
go.uber.org/mock v0.4.0 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/genproto v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
)

Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1269,8 +1269,9 @@ github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/assertions v1.13.0 h1:Dx1kYM01xsSqKPno3aqLnrwac2LetPvN23diwyr69Qs=
github.com/smartystreets/assertions v1.13.0/go.mod h1:wDmR7qL282YbGsPy6H/yAsesrxfxaaSlJazyFLYVFx8=
github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
Expand Down Expand Up @@ -2064,8 +2065,8 @@ google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64=
google.golang.org/genproto v0.0.0-20230629202037-9506855d4529 h1:9JucMWR7sPvCxUFd6UsOUNmA5kCcWOfORaT3tpAsKQs=
google.golang.org/genproto v0.0.0-20230629202037-9506855d4529/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64=
google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291 h1:4HZJ3Xv1cmrJ+0aFo304Zn79ur1HMxptAE7aCPNLSqc=
google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:RGnPtTG7r4i8sPlNyDeikXF99hMM+hN6QMm4ooG9g2g=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 h1:AgADTJarZTBqgjiUzRgfaBchgYB3/WFTC80GPwsMcRI=
Expand Down
Loading
Loading