Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support gRPC endpoints in core #1513

Open
wants to merge 62 commits into
base: v0.34.x-celestia
Choose a base branch
from
Open

Conversation

rach-id
Copy link
Member

@rach-id rach-id commented Oct 8, 2024

This is an implementation of a streaming API for blocks in core.

Helps close celestiaorg/celestia-app#3421 but not sure it entirely closes it.

It can easily be used:

package main

import (
	"context"
	"fmt"
	coregrpc "github.com/tendermint/tendermint/rpc/grpc"
)

func main() {
	client := coregrpc.StartBlockAPIGRPCClient("tcp://localhost:9090")

	blockStreamer, err := client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: 2})
	if err != nil {
		panic(err)
	}
	blockMeta, err := client.BlockMetaByHeight(context.Background(), &coregrpc.BlockMetaByHeightRequest{Height: 2})
	if err != nil {
		panic(err)
	}
	parts := make([]*core.Part, 0)
	for i := 0; i < int(blockMeta.BlockMeta.BlockID.PartSetHeader.Total); i++ {
		resp, err := blockStreamer.Recv()
		if err != nil {
			panic(err)
		}
		parts = append(parts, resp.BlockPart)
		if resp.IsLast && i < int(blockMeta.BlockMeta.BlockID.PartSetHeader.Total)-1 {
			panic("couldn't get all parts")
		} else if resp.IsLast {
			break
		}
	}

	h := types.NewPartSetFromHeader(types.PartSetHeader{
		Total: blockMeta.BlockMeta.BlockID.PartSetHeader.Total,
		Hash:  blockMeta.BlockMeta.BlockID.PartSetHeader.Hash,
	})

	for _, part := range parts {
		ok, err := h.AddPart(&types.Part{
			Index: part.Index,
			Bytes: part.Bytes,
			Proof: merkle.Proof{
				Total:    part.Proof.Total,
				Index:    part.Proof.Index,
				LeafHash: part.Proof.LeafHash,
				Aunts:    part.Proof.Aunts,
			},
		})
		if err != nil {
			panic(err)
		}
		if !ok {
			panic("not okey")
		}
	}
	pbb := new(core.Block)
	bz, err := io.ReadAll(h.GetReader())
	if err != nil {
		panic(err)
	}
	err = proto.Unmarshal(bz, pbb)
	if err != nil {
		panic(err)
	}
	block, err := types.BlockFromProto(pbb)
	if err != nil {
		panic(err)
	}
	fmt.Println(block)

	// get a commit
	commit, err := client.Commit(context.Background(), &coregrpc.CommitRequest{Height: 10})
	if err != nil {
		panic(err)
	}
	fmt.Println(commit)

	// listen for new heights
	streamer, err := client.SubscribeNewHeights(context.Background(), &coregrpc.SubscribeNewHeightsRequest{})
	if err != nil {
		panic(err)
	}
	for {
		resp, err := streamer.Recv()
		if err != nil {
			panic(err)
		}
		fmt.Println(resp)
	}
}

Ps: I didn't add the tests because I didn't find a direct way of mocking the environment without polluting the rest of the repo (exporting some methods, adding new helpers, etc). And I think since the implementation is simple, just querying the block/state stores for results, it's fine to leave it untested.

@rach-id rach-id requested review from walldiss and renaynay October 8, 2024 09:44
@rach-id rach-id self-assigned this Oct 8, 2024
@rach-id rach-id requested a review from a team as a code owner October 8, 2024 09:44
@rach-id rach-id requested review from staheri14 and ninabarbakadze and removed request for a team October 8, 2024 09:44
@rach-id rach-id marked this pull request as draft October 8, 2024 11:40
@rach-id
Copy link
Member Author

rach-id commented Oct 8, 2024

Just found out that once we enable the app grpc, these endpoints get overridden, I'll be looking into it

proto/tendermint/rpc/grpc/types.proto Outdated Show resolved Hide resolved
proto/tendermint/rpc/grpc/types.proto Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated Show resolved Hide resolved
evan-forbes
evan-forbes previously approved these changes Nov 19, 2024
rootulp
rootulp previously approved these changes Nov 19, 2024
Copy link
Collaborator

@rootulp rootulp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left a few non-blocking questions / suggestions

rpc/grpc/api.go Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated
Comment on lines 116 to 117
env.Logger.Debug("couldn't cast event data to new block")
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] should this log an error and terminate instead of continuing?

Suggested change
env.Logger.Debug("couldn't cast event data to new block")
continue
env.Logger.Error("couldn't cast event data to new block")
return

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to have the gRPC logs polluting the output but it's a good point. I will handle this in a separate PR where I will be doing more testing and adding more logic for failures etc. I didn't want to include everything in the same PR to reduce complexity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this case should never happen right? That you subscribe for an event of a certain type and you get an unexpected type back. Shouldn't this panic as it's an internal developer error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panicking would mean stopping the node. is this something we want to do?

That's a separate issue I want to handle in a subsequent PR. If the gRPC connection panics, the node goes down. So I am thinking of having recovers around here or add them to the cosmos-sdk.

rpc/grpc/api.go Outdated
defer ticker.Stop()
blockAPI.Lock()
defer blockAPI.Unlock()
for i := 1; i < 6; i++ {
Copy link
Collaborator

@rootulp rootulp Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Should we extract a constant for RETRY_ATTEMPTS = 5

Suggested change
for i := 1; i < 6; i++ {
for i := 0; i < RETRY_ATTEMPTS; i++ {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we could. I can extract it if you want

Copy link
Collaborator

@rootulp rootulp Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO an extracted constant is easier to read but this comment is an optional nit and not blocking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Co-authored-by: Rootul P <rootulp@gmail.com>
@rach-id rach-id dismissed stale reviews from rootulp and evan-forbes via cee4b95 November 19, 2024 16:55

message NewHeightEvent {
int64 height = 1;
bytes hash = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this get used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the new blocks subscription. whenever there is a new block, this message is broadcasted to listeners

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant specifically the hash

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aaah, it was also a requirement from node as they preferred to have the hash too. It's not currently used in the implementation, but maybe they have some ideas around that to reduce the number of requests

rpc/grpc/api.go Outdated Show resolved Hide resolved
rpc/grpc/api.go Outdated
Comment on lines 116 to 117
env.Logger.Debug("couldn't cast event data to new block")
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this case should never happen right? That you subscribe for an event of a certain type and you get an unexpected type back. Shouldn't this panic as it's an internal developer error

rpc/grpc/api.go Outdated
}
}

func (blockAPI *BlockAPI) StartNewBlockEventListener(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer this function returns an error that can be logged out in grpccore.StartGRPCServer(...). I don't think we should allow this service to be able to fail silently. You can still run this in a go routine but you should pass the error back in a channel and return the error if either this or the Serve method fails.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue with this is that the cosmos-sdk doesn't listen for errors after the server starts listening (for about 5sec or so). Making it listen would require more changes, which I tried to reduce given we want to reduce diff with upstream.

Current way it will log errors but not stop the node or anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this error in the method is cleaner than adding logs for every return. There are also a few returns that don't error.

It also looks like this is called in cometbft not cosmos-sdk (I see it in node.go). Am I missing something?

Copy link
Member Author

@rach-id rach-id Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this error in the method is cleaner than adding logs for every return. There are also a few returns that don't error.

So you mean we return an error from this method and also from the others and refactor the comsos-sdk to act when an error happens?

It also looks like this is called in cometbft not cosmos-sdk (I see it in node.go). Am I missing something?

this is called in cometBFT when you run plain comet (for example in e2e). But when you run an application, the cosmos-sdk overrides that server and calls this in there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like so long as the RPC addr and GRPC addr in the config.toml is populated it will run it in CometBFT as well.

Ideally all go routines shut down gracefully when the node is shut down or errors. I would want the gRPC which is now something that the DA network will depend on to fail noisily. This could be with the consensus node also spinning down or something else that notifies the operator that there is a problem

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopping the node is on the table? I guess it's good to stop the node if bridge nodes are connected to full nodes. But if they're connected to validators, it shouldn't kill the node as it will stop a validator

proto/tendermint/rpc/grpc/types.proto Outdated Show resolved Hide resolved
select {
case <-ctx.Done():
return
case <-blockAPI.newBlockSubscription.Cancelled():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What causes a block subscription to be cancelled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when it's manually cancelled and maybe other cases I am not aware of

@@ -21,6 +21,9 @@ type Config struct {
func StartGRPCServer(ln net.Listener) error {
grpcServer := grpc.NewServer()
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
api := NewBlockAPI()
go api.StartNewBlockEventListener(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have context with a cancel and cancel this go routine whenever grpcServer.Serve() returns (as this implies that something has called Stop or GracefulStop)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where would that context come from? passed as a parameter?

thing is only cosmos-sdk will call this, and there is no context there to pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmos-sdk calls StartGRPCServer. I see it called also by the cometbft node OnStart. Is it being called twice?

Regardless what I was proposing is:

func RunGRPCServer(ln net.Listener) error {
	grpcServer := grpc.NewServer()
	RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
	api := NewBlockAPI()
	RegisterBlockAPIServer(grpcServer, api)
	errCh := make(chan error, 2)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func() {
		errCh <- api.StartNewBlockEventListener(ctx)
	}()
	go func() {
		errCh <- grpcServer.Serve(ln)
	}()
	defer grpcServer.GracefulStop()
        // blocks until one errors or returns nil
	return <-errCh
}

This means that both processes get shut down when either of them errors or terminates

Copy link
Member Author

@rach-id rach-id Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmos-sdk calls StartGRPCServer. I see it called also by the cometbft node OnStart. Is it being called twice?

when starting comet, the OnStart gets called. When using the cosmos-sdk, that server is overriden a new one is created.

This means that both processes get shut down when either of them errors or terminates

so the issue is having them shutdown both instead of one failing and the other staying up, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes and just making sure all errors are collected and logged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types/part_set.go Show resolved Hide resolved
@rootulp rootulp requested review from rootulp and cmwaters and removed request for staheri14 December 13, 2024 15:54
}
data, ok := event.Data().(eventstypes.EventDataNewBlock)
if !ok {
env.Logger.Debug("couldn't cast event data to new block")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional]

Suggested change
env.Logger.Debug("couldn't cast event data to new block")
env.Logger.Error("couldn't cast event data to new block")

// SubscriptionCapacity the maximum number of pending blocks in the subscription.
const SubscriptionCapacity = 500

func (blockAPI *BlockAPI) retryNewBlocksSubscription(ctx context.Context) (bool, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question][nit] what does the bool return argument indicate? It seems like (isSuccess bool, err error) IMO the bool argument is unnecessary b/c either this function returns:

  1. true, no error
  2. false, an error

So the bool argument could be removed and this could just return an error if the retry failed or nil if the retry succeeded

Comment on lines +199 to +202
if blockAPI.heightListeners == nil {
// if this is nil, then there is no need to close anything
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is unnecessary because heightListeners is always non-nil due to:

func NewBlockAPI() *BlockAPI {
	return &BlockAPI{
		heightListeners:   make(map[chan NewHeightEvent]struct{}, 1000),
		subscriptionID:    fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
		subscriptionQuery: eventstypes.EventQueryNewBlock,
	}
}

and it's never explicitly set to nil so this check will never evaluate to true. Proposal to remove it

Suggested change
if blockAPI.heightListeners == nil {
// if this is nil, then there is no need to close anything
return
}

var err error
// stop the events subscription
if blockAPI.newBlockSubscription != nil {
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error isn't explicitly handled, is that intentional? I would expect

Suggested change
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
if err != nil {
// handle the error
}
blockAPI.newBlockSubscription = nil

opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
opts = append(opts, grpc.WithContextDialer(dialerFunc))
conn, err := grpc.Dial( //nolint:staticcheck
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be refactored to eliminate the nolint directive

Suggested change
conn, err := grpc.Dial( //nolint:staticcheck
conn, err := grpc.NewClient(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Expose all celestia-node required endpoints through gRPC
6 participants