-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multiple instances support for grpc broker. #235
Changes from all commits
2d0074d
1cf6559
9172a13
29902c2
eb7c030
5ed379f
9a92eab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,4 +65,5 @@ jobs: | |
make e2e-test | ||
env: | ||
container_tool: docker | ||
SERVER_REPLICAS: 2 | ||
MESSAGE_DRIVER_TYPE: grpc |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,9 @@ import ( | |
cetypes "github.com/cloudevents/sdk-go/v2/types" | ||
"github.com/google/uuid" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/status" | ||
"google.golang.org/protobuf/types/known/emptypb" | ||
"k8s.io/klog/v2" | ||
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1" | ||
|
@@ -51,6 +53,7 @@ type GRPCBroker struct { | |
instanceID string | ||
eventInstanceDao dao.EventInstanceDao | ||
resourceService services.ResourceService | ||
eventService services.EventService | ||
statusEventService services.StatusEventService | ||
bindAddress string | ||
subscribers map[string]*subscriber // registered subscribers | ||
|
@@ -79,6 +82,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer { | |
instanceID: env().Config.MessageBroker.ClientID, | ||
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), | ||
resourceService: env().Services.Resources(), | ||
eventService: env().Services.Events(), | ||
statusEventService: env().Services.StatusEvents(), | ||
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort, | ||
subscribers: make(map[string]*subscriber), | ||
|
@@ -183,32 +187,44 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv | |
subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error { | ||
evt, err := encodeResourceSpec(res) | ||
if err != nil { | ||
// return the error to requeue the event if encoding fails (e.g., due to invalid resource spec). | ||
return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) | ||
} | ||
|
||
klog.V(4).Infof("send the event to spec subscribers, %s", evt) | ||
|
||
// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf | ||
pbEvt := &pbv1.CloudEvent{} | ||
if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil { | ||
return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err) | ||
// return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent). | ||
return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err) | ||
} | ||
|
||
// send the cloudevent to the subscriber | ||
// TODO: error handling to address errors beyond network issues. | ||
klog.V(4).Infof("sending the event to spec subscribers, %s", evt) | ||
if err := subServer.Send(pbEvt); err != nil { | ||
klog.Errorf("failed to send grpc event, %v", err) | ||
// Return the error without wrapping, as it includes the gRPC error code and message for further handling. | ||
// For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's | ||
// error channel to unregister the subscriber. | ||
return err | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
select { | ||
case err := <-errChan: | ||
// When reaching this point, an unrecoverable error occurred while sending the event, | ||
// such as the connection being closed. Unregister the subscriber to trigger agent reconnection. | ||
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err) | ||
bkr.unregister(subscriberID) | ||
return err | ||
case <-subServer.Context().Done(): | ||
// The context of the stream has been canceled or completed. | ||
// This could happen if: | ||
// - The client closed the connection or canceled the stream. | ||
// - The server closed the stream, potentially due to a shutdown. | ||
// Regardless of the reason, unregister the subscriber and stop processing. | ||
// No error is returned here because the stream closure is expected. | ||
bkr.unregister(subscriberID) | ||
return nil | ||
} | ||
|
@@ -377,52 +393,67 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy | |
} | ||
|
||
// handleRes publish the resource to the correct subscriber. | ||
func (bkr *GRPCBroker) handleRes(resource *api.Resource) { | ||
func (bkr *GRPCBroker) handleRes(resource *api.Resource) error { | ||
bkr.mu.RLock() | ||
defer bkr.mu.RUnlock() | ||
for _, subscriber := range bkr.subscribers { | ||
if subscriber.clusterName == resource.ConsumerName { | ||
if err := subscriber.handler(resource); err != nil { | ||
subscriber.errChan <- err | ||
// check if the error is recoverable. For unrecoverable errors, | ||
// such as a connection closed by an intermediate proxy, push | ||
// the error to subscriber's error channel to unregister the subscriber. | ||
st, ok := status.FromError(err) | ||
if ok && st.Code() == codes.Unavailable { | ||
// TODO: handle more error codes that can't be recovered | ||
subscriber.errChan <- err | ||
} | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// OnCreate is called by the controller when a resource is created on the maestro server. | ||
func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { | ||
resource, err := bkr.resourceService.Get(ctx, id) | ||
if err != nil { | ||
// if the resource is not found, it indicates the resource has been processed. | ||
if err.Is404() { | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
bkr.handleRes(resource) | ||
|
||
return nil | ||
return bkr.handleRes(resource) | ||
} | ||
|
||
// OnUpdate is called by the controller when a resource is updated on the maestro server. | ||
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { | ||
resource, err := bkr.resourceService.Get(ctx, id) | ||
if err != nil { | ||
// if the resource is not found, it indicates the resource has been processed. | ||
if err.Is404() { | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
bkr.handleRes(resource) | ||
|
||
return nil | ||
return bkr.handleRes(resource) | ||
} | ||
|
||
// OnDelete is called by the controller when a resource is deleted from the maestro server. | ||
func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { | ||
resource, err := bkr.resourceService.Get(ctx, id) | ||
if err != nil { | ||
// if the resource is not found, it indicates the resource has been processed. | ||
if err.Is404() { | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
bkr.handleRes(resource) | ||
|
||
return nil | ||
return bkr.handleRes(resource) | ||
} | ||
|
||
// On StatusUpdate will be called on each new status event inserted into db. | ||
|
@@ -442,6 +473,39 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s | |
) | ||
} | ||
|
||
// PredicateEvent checks if the event should be processed by the current instance | ||
// by verifying the resource consumer name is in the subscriber list, ensuring the | ||
// event will be only processed when the consumer is subscribed to the current broker. | ||
func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) { | ||
evt, err := bkr.eventService.Get(ctx, eventID) | ||
if err != nil { | ||
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) | ||
} | ||
|
||
// fast return if the event is already reconciled | ||
if evt.ReconciledDate != nil { | ||
return false, nil | ||
} | ||
|
||
resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) | ||
if svcErr != nil { | ||
// if the resource is not found, it indicates the resource has been handled by other instances. | ||
if svcErr.Is404() { | ||
klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) | ||
now := time.Now() | ||
evt.ReconciledDate = &now | ||
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { | ||
return false, fmt.Errorf("failed to mark event with id (%s) as reconciled: %s", evt.ID, svcErr) | ||
} | ||
return false, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. who will mark the event as reconciled? it should be marked by the instance which handle this resource. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the event is marked by the instance who handle this event.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @clyang82 added logic to mark event as reconciled here. |
||
} | ||
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) | ||
} | ||
|
||
// check if the consumer is subscribed to the broker | ||
return bkr.IsConsumerSubscribed(resource.ConsumerName), nil | ||
} | ||
|
||
// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. | ||
func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool { | ||
bkr.mu.RLock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a log here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log added.