Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple instances support for grpc broker. #235

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ jobs:
make e2e-test
env:
container_tool: docker
SERVER_REPLICAS: 2
MESSAGE_DRIVER_TYPE: grpc
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ test/e2e/.consumer_name
test/e2e/.external_host_ip
test/e2e/report/*
unit-test-results.json
integration-test-results.json
*integration-test-results.json
test/e2e/setup/aro/aro-hcp
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
7 changes: 6 additions & 1 deletion cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)
Expand Down Expand Up @@ -47,9 +49,11 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
var eventFilter controllers.EventFilter
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent)
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
Expand All @@ -67,12 +71,13 @@ func runServer(cmd *cobra.Command, args []string) {
// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory))
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
controllersServer := server.NewControllersServer(eventServer)
controllersServer := server.NewControllersServer(eventServer, eventFilter)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(eventServer EventServer) *ControllersServer {
func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer {
s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
eventFilter,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down
8 changes: 8 additions & 0 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type EventServer interface {

// OnStatusUpdate handles status update events for a resource.
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error

// returns true if the event should be processed by the current instance, otherwise false and an error if it occurs.
PredicateEvent(ctx context.Context, eventID string) (bool, error)
}

var _ EventServer = &MessageQueueEventServer{}
Expand Down Expand Up @@ -145,6 +148,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r
)
}

// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on advisory lock.
func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
return true, nil
}

// handleStatusUpdate processes the resource status update from the agent.
// The resource argument contains the updated status.
// The function performs the following steps:
Expand Down
94 changes: 79 additions & 15 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
cetypes "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
Expand Down Expand Up @@ -51,6 +53,7 @@ type GRPCBroker struct {
instanceID string
eventInstanceDao dao.EventInstanceDao
resourceService services.ResourceService
eventService services.EventService
statusEventService services.StatusEventService
bindAddress string
subscribers map[string]*subscriber // registered subscribers
Expand Down Expand Up @@ -79,6 +82,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
instanceID: env().Config.MessageBroker.ClientID,
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
statusEventService: env().Services.StatusEvents(),
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort,
subscribers: make(map[string]*subscriber),
Expand Down Expand Up @@ -183,32 +187,44 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error {
evt, err := encodeResourceSpec(res)
if err != nil {
// return the error to requeue the event if encoding fails (e.g., due to invalid resource spec).
return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err)
}

klog.V(4).Infof("send the event to spec subscribers, %s", evt)

// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
pbEvt := &pbv1.CloudEvent{}
if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil {
return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err)
// return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent).
return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err)
}

// send the cloudevent to the subscriber
// TODO: error handling to address errors beyond network issues.
klog.V(4).Infof("sending the event to spec subscribers, %s", evt)
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
// Return the error without wrapping, as it includes the gRPC error code and message for further handling.
// For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's
// error channel to unregister the subscriber.
return err
}

return nil
})

select {
case err := <-errChan:
// When reaching this point, an unrecoverable error occurred while sending the event,
// such as the connection being closed. Unregister the subscriber to trigger agent reconnection.
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
bkr.unregister(subscriberID)
return err
case <-subServer.Context().Done():
// The context of the stream has been canceled or completed.
// This could happen if:
// - The client closed the connection or canceled the stream.
// - The server closed the stream, potentially due to a shutdown.
// Regardless of the reason, unregister the subscriber and stop processing.
// No error is returned here because the stream closure is expected.
bkr.unregister(subscriberID)
return nil
}
Expand Down Expand Up @@ -377,52 +393,67 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
}

// handleRes publish the resource to the correct subscriber.
func (bkr *GRPCBroker) handleRes(resource *api.Resource) {
func (bkr *GRPCBroker) handleRes(resource *api.Resource) error {
bkr.mu.RLock()
defer bkr.mu.RUnlock()
for _, subscriber := range bkr.subscribers {
if subscriber.clusterName == resource.ConsumerName {
if err := subscriber.handler(resource); err != nil {
subscriber.errChan <- err
// check if the error is recoverable. For unrecoverable errors,
// such as a connection closed by an intermediate proxy, push
// the error to subscriber's error channel to unregister the subscriber.
st, ok := status.FromError(err)
if ok && st.Code() == codes.Unavailable {
// TODO: handle more error codes that can't be recovered
subscriber.errChan <- err
}
return err
}
}
}
return nil
}

// OnCreate is called by the controller when a resource is created on the maestro server.
func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
// if the resource is not found, it indicates the resource has been processed.
if err.Is404() {
return nil
}
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleRes(resource)
}

// OnUpdate is called by the controller when a resource is updated on the maestro server.
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
// if the resource is not found, it indicates the resource has been processed.
if err.Is404() {
return nil
}
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleRes(resource)
}

// OnDelete is called by the controller when a resource is deleted from the maestro server.
func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
// if the resource is not found, it indicates the resource has been processed.
if err.Is404() {
return nil
}
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleRes(resource)
}

// On StatusUpdate will be called on each new status event inserted into db.
Expand All @@ -442,6 +473,39 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s
)
}

// PredicateEvent checks if the event should be processed by the current instance
// by verifying the resource consumer name is in the subscriber list, ensuring the
// event will be only processed when the consumer is subscribed to the current broker.
func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
evt, err := bkr.eventService.Get(ctx, eventID)
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by other instances.
if svcErr.Is404() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log added.

klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to mark event with id (%s) as reconciled: %s", evt.ID, svcErr)
}
return false, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who will mark the event as reconciled? it should be marked by the instance which handle this resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the event is marked by the instance who handle this event.
This was added to handle this case:

  1. resource created and then deleted, but no agent running.
  2. agent start, resync the resource.
  3. the resource is deleted, but the event will not marked as reconciled.
  4. if handle the event again. will get 404 when get resource from db.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clyang82 added logic to mark event as reconciled here.

}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

// check if the consumer is subscribed to the broker
return bkr.IsConsumerSubscribed(resource.ConsumerName), nil
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool {
bkr.mu.RLock()
Expand Down
Loading
Loading