-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support gRPC endpoints in core #1513
base: v0.34.x-celestia
Are you sure you want to change the base?
Conversation
Just found out that once we enable the app grpc, these endpoints get overridden, I'll be looking into it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left a few non-blocking questions / suggestions
rpc/grpc/api.go
Outdated
env.Logger.Debug("couldn't cast event data to new block") | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] should this log an error and terminate instead of continuing?
env.Logger.Debug("couldn't cast event data to new block") | |
continue | |
env.Logger.Error("couldn't cast event data to new block") | |
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to have the gRPC logs polluting the output but it's a good point. I will handle this in a separate PR where I will be doing more testing and adding more logic for failures etc. I didn't want to include everything in the same PR to reduce complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this case should never happen right? That you subscribe for an event of a certain type and you get an unexpected type back. Shouldn't this panic as it's an internal developer error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Panicking would mean stopping the node. is this something we want to do?
That's a separate issue I want to handle in a subsequent PR. If the gRPC connection panics, the node goes down. So I am thinking of having recovers around here or add them to the cosmos-sdk.
rpc/grpc/api.go
Outdated
defer ticker.Stop() | ||
blockAPI.Lock() | ||
defer blockAPI.Unlock() | ||
for i := 1; i < 6; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Should we extract a constant for RETRY_ATTEMPTS = 5
for i := 1; i < 6; i++ { | |
for i := 0; i < RETRY_ATTEMPTS; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we could. I can extract it if you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO an extracted constant is easier to read but this comment is an optional nit and not blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Rootul P <rootulp@gmail.com>
|
||
message NewHeightEvent { | ||
int64 height = 1; | ||
bytes hash = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this get used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the new blocks subscription. whenever there is a new block, this message is broadcasted to listeners
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant specifically the hash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aaah, it was also a requirement from node as they preferred to have the hash too. It's not currently used in the implementation, but maybe they have some ideas around that to reduce the number of requests
rpc/grpc/api.go
Outdated
env.Logger.Debug("couldn't cast event data to new block") | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this case should never happen right? That you subscribe for an event of a certain type and you get an unexpected type back. Shouldn't this panic as it's an internal developer error
rpc/grpc/api.go
Outdated
} | ||
} | ||
|
||
func (blockAPI *BlockAPI) StartNewBlockEventListener(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer this function returns an error that can be logged out in grpccore.StartGRPCServer(...)
. I don't think we should allow this service to be able to fail silently. You can still run this in a go routine but you should pass the error back in a channel and return the error if either this or the Serve
method fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the issue with this is that the cosmos-sdk doesn't listen for errors after the server starts listening (for about 5sec or so). Making it listen would require more changes, which I tried to reduce given we want to reduce diff with upstream.
Current way it will log errors but not stop the node or anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having this error in the method is cleaner than adding logs for every return. There are also a few return
s that don't error.
It also looks like this is called in cometbft not cosmos-sdk (I see it in node.go). Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having this error in the method is cleaner than adding logs for every return. There are also a few returns that don't error.
So you mean we return an error from this method and also from the others and refactor the comsos-sdk to act when an error happens?
It also looks like this is called in cometbft not cosmos-sdk (I see it in node.go). Am I missing something?
this is called in cometBFT when you run plain comet (for example in e2e). But when you run an application, the cosmos-sdk overrides that server and calls this in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like so long as the RPC addr and GRPC addr in the config.toml is populated it will run it in CometBFT as well.
Ideally all go routines shut down gracefully when the node is shut down or errors. I would want the gRPC which is now something that the DA network will depend on to fail noisily. This could be with the consensus node also spinning down or something else that notifies the operator that there is a problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopping the node is on the table? I guess it's good to stop the node if bridge nodes are connected to full nodes. But if they're connected to validators, it shouldn't kill the node as it will stop a validator
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-blockAPI.newBlockSubscription.Cancelled(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What causes a block subscription to be cancelled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when it's manually cancelled and maybe other cases I am not aware of
rpc/grpc/client_server.go
Outdated
@@ -21,6 +21,9 @@ type Config struct { | |||
func StartGRPCServer(ln net.Listener) error { | |||
grpcServer := grpc.NewServer() | |||
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{}) | |||
api := NewBlockAPI() | |||
go api.StartNewBlockEventListener(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have context with a cancel and cancel this go routine whenever grpcServer.Serve() returns (as this implies that something has called Stop or GracefulStop)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where would that context come from? passed as a parameter?
thing is only cosmos-sdk will call this, and there is no context there to pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cosmos-sdk calls StartGRPCServer
. I see it called also by the cometbft node OnStart
. Is it being called twice?
Regardless what I was proposing is:
func RunGRPCServer(ln net.Listener) error {
grpcServer := grpc.NewServer()
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
api := NewBlockAPI()
RegisterBlockAPIServer(grpcServer, api)
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
errCh <- api.StartNewBlockEventListener(ctx)
}()
go func() {
errCh <- grpcServer.Serve(ln)
}()
defer grpcServer.GracefulStop()
// blocks until one errors or returns nil
return <-errCh
}
This means that both processes get shut down when either of them errors or terminates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cosmos-sdk calls StartGRPCServer. I see it called also by the cometbft node OnStart. Is it being called twice?
when starting comet, the OnStart
gets called. When using the cosmos-sdk, that server is overriden a new one is created.
This means that both processes get shut down when either of them errors or terminates
so the issue is having them shutdown both instead of one failing and the other staying up, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes and just making sure all errors are collected and logged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reverts commit 072b5eb.
} | ||
data, ok := event.Data().(eventstypes.EventDataNewBlock) | ||
if !ok { | ||
env.Logger.Debug("couldn't cast event data to new block") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional]
env.Logger.Debug("couldn't cast event data to new block") | |
env.Logger.Error("couldn't cast event data to new block") |
// SubscriptionCapacity the maximum number of pending blocks in the subscription. | ||
const SubscriptionCapacity = 500 | ||
|
||
func (blockAPI *BlockAPI) retryNewBlocksSubscription(ctx context.Context) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question][nit] what does the bool return argument indicate? It seems like (isSuccess bool, err error)
IMO the bool argument is unnecessary b/c either this function returns:
- true, no error
- false, an error
So the bool argument could be removed and this could just return an error if the retry failed or nil if the retry succeeded
if blockAPI.heightListeners == nil { | ||
// if this is nil, then there is no need to close anything | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is unnecessary because heightListeners is always non-nil due to:
func NewBlockAPI() *BlockAPI {
return &BlockAPI{
heightListeners: make(map[chan NewHeightEvent]struct{}, 1000),
subscriptionID: fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
subscriptionQuery: eventstypes.EventQueryNewBlock,
}
}
and it's never explicitly set to nil so this check will never evaluate to true. Proposal to remove it
if blockAPI.heightListeners == nil { | |
// if this is nil, then there is no need to close anything | |
return | |
} |
var err error | ||
// stop the events subscription | ||
if blockAPI.newBlockSubscription != nil { | ||
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this error isn't explicitly handled, is that intentional? I would expect
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery) | |
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery) | |
if err != nil { | |
// handle the error | |
} | |
blockAPI.newBlockSubscription = nil |
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
} | ||
opts = append(opts, grpc.WithContextDialer(dialerFunc)) | ||
conn, err := grpc.Dial( //nolint:staticcheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be refactored to eliminate the nolint directive
conn, err := grpc.Dial( //nolint:staticcheck | |
conn, err := grpc.NewClient( |
This is an implementation of a streaming API for blocks in core.
Helps close celestiaorg/celestia-app#3421 but not sure it entirely closes it.
It can easily be used:
Ps: I didn't add the tests because I didn't find a direct way of mocking the environment without polluting the rest of the repo (exporting some methods, adding new helpers, etc). And I think since the implementation is simple, just querying the block/state stores for results, it's fine to leave it untested.