diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 8d4aca79..080e59f5 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -65,4 +65,5 @@ jobs: make e2e-test env: container_tool: docker + SERVER_REPLICAS: 2 MESSAGE_DRIVER_TYPE: grpc diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index 1b3611df..d582e060 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -12,7 +12,9 @@ import ( "github.com/openshift-online/maestro/cmd/maestro/environments" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/config" + "github.com/openshift-online/maestro/pkg/controllers" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" ) @@ -47,9 +49,14 @@ func runServer(cmd *cobra.Command, args []string) { // For gRPC, create a gRPC broker to handle resource spec and status events. // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer + var eventHandler controllers.EventHandler if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) + eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent, + environments.Environment().Services.Events(), + dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory), + dao.NewInstanceDao(&environments.Environment().Database.SessionFactory)) } else { klog.Info("Setting up message queue event server") var statusDispatcher dispatcher.Dispatcher @@ -67,12 +74,14 @@ func runServer(cmd *cobra.Command, args []string) { // Set the status dispatcher for the healthcheck server healthcheckServer.SetStatusDispatcher(statusDispatcher) eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) + eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory), + environments.Environment().Services.Events()) } // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - controllersServer := server.NewControllersServer(eventServer) + controllersServer := server.NewControllersServer(eventServer, eventHandler) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index d7d9cb68..178a9761 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -10,10 +10,10 @@ import ( "github.com/openshift-online/maestro/pkg/logger" ) -func NewControllersServer(eventServer EventServer) *ControllersServer { +func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer { s := &ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(env().Database.SessionFactory), + eventHandler, env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index cae4207c..6bbec1ef 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -29,16 +29,19 @@ type EventServer interface { Start(ctx context.Context) // OnCreate handles the creation of a resource. - OnCreate(ctx context.Context, resourceID string) error + OnCreate(ctx context.Context, eventID, resourceID string) error // OnUpdate handles updates to a resource. - OnUpdate(ctx context.Context, resourceID string) error + OnUpdate(ctx context.Context, eventID, resourceID string) error // OnDelete handles the deletion of a resource. - OnDelete(ctx context.Context, resourceID string) error + OnDelete(ctx context.Context, eventID, resourceID string) error // OnStatusUpdate handles status update events for a resource. OnStatusUpdate(ctx context.Context, eventID, resourceID string) error + + // returns true if the event should be processed by the current instance, otherwise false and an error. + PredicateEvent(ctx context.Context, eventID string) (bool, error) } var _ EventServer = &MessageQueueEventServer{} @@ -114,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } @@ -171,6 +174,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r return err } +// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on lock. +func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + return true, nil +} + // handleStatusUpdate processes the resource status update from the agent. // The resource argument contains the updated status. // The function performs the following steps: diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 2dee39e4..6daa7cec 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -51,6 +51,7 @@ type GRPCBroker struct { instanceID string eventInstanceDao dao.EventInstanceDao resourceService services.ResourceService + eventService services.EventService statusEventService services.StatusEventService bindAddress string subscribers map[string]*subscriber // registered subscribers @@ -79,6 +80,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer { instanceID: env().Config.MessageBroker.ClientID, eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), resourceService: env().Services.Resources(), + eventService: env().Services.Events(), statusEventService: env().Services.StatusEvents(), bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort, subscribers: make(map[string]*subscriber), @@ -389,40 +391,49 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) { } } +// handleResEvent publish the resource to the correct subscriber and add the event instance record. +func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error { + bkr.handleRes(resource) + + // add the event instance record to mark the event has been processed by the current instance + if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: bkr.instanceID, + }); err != nil { + return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) + } + + return nil +} + // OnCreate is called by the controller when a resource is created on the maestro server. -func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { - resource, err := bkr.resourceService.Get(ctx, id) +func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error { + resource, err := bkr.resourceService.Get(ctx, resourceID) if err != nil { return err } - bkr.handleRes(resource) - - return nil + return bkr.handleResEvent(ctx, eventID, resource) } // OnUpdate is called by the controller when a resource is updated on the maestro server. -func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { - resource, err := bkr.resourceService.Get(ctx, id) +func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error { + resource, err := bkr.resourceService.Get(ctx, resourceID) if err != nil { return err } - bkr.handleRes(resource) - - return nil + return bkr.handleResEvent(ctx, eventID, resource) } // OnDelete is called by the controller when a resource is deleted from the maestro server. -func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { - resource, err := bkr.resourceService.Get(ctx, id) +func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error { + resource, err := bkr.resourceService.Get(ctx, resourceID) if err != nil { return err } - bkr.handleRes(resource) - - return nil + return bkr.handleResEvent(ctx, eventID, resource) } // On StatusUpdate will be called on each new status event inserted into db. @@ -469,6 +480,36 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s return err } +// PredicateEvent checks if the event should be processed by the current instance +// by verifying the resource consumer name is in the subscriber list, ensuring the +// event will be only processed when the consumer is subscribed to the current broker. +func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + evt, err := bkr.eventService.Get(ctx, eventID) + if err != nil { + return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) + } + resource, err := bkr.resourceService.Get(ctx, evt.SourceID) + if err != nil { + return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error()) + } + + if bkr.IsConsumerSubscribed(resource.ConsumerName) { + return true, nil + } + + // if the consumer is not subscribed to the broker, then add the event instance record + // to indicate the event has been processed by the instance + if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: bkr.instanceID, + }); err != nil { + return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) + } + klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName) + + return false, nil +} + // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool { bkr.mu.RLock() diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go new file mode 100644 index 00000000..c41a2d8f --- /dev/null +++ b/pkg/controllers/event_handler.go @@ -0,0 +1,170 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" + "github.com/openshift-online/maestro/pkg/services" + "k8s.io/klog/v2" +) + +// EventHandler defines the actions to handle an event at various stages of its lifecycle. +type EventHandler interface { + // ShouldHandleEvent determines whether the event should be processed. + // Returns true if the event should be handled, false and an error otherwise. + ShouldHandleEvent(ctx context.Context, id string) (bool, error) + + // DeferredAction schedules any deferred actions that need to be executed + // after the event is processed successfully or unsuccessfully. + DeferredAction(ctx context.Context, id string) + + // PostProcess is called after the event is processed to perform any cleanup + // or additional actions required for the event. + PostProcess(ctx context.Context, event *api.Event) error +} + +// LockBasedEventHandler is an implementation of EventHandler that uses a locking mechanism to control event processing. +// It leverages a lock factory to create advisory locks for each event ID, ensuring non-blocking, thread-safe access. +// - ShouldHandleEvent acquires the lock for the event ID and returns true if the lock is successful. +// - DeferredAction releases the lock for the event ID. +// - PostProcess updates the event with a reconciled date after processing. +type LockBasedEventHandler struct { + lockFactory db.LockFactory + locks map[string]string + events services.EventService +} + +func NewLockBasedEventHandler(lockFactory db.LockFactory, events services.EventService) EventHandler { + return &LockBasedEventHandler{ + lockFactory: lockFactory, + locks: make(map[string]string), + events: events, + } +} + +func (h *LockBasedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { + // lock the Event with a fail-fast advisory lock context. + // this allows concurrent processing of many events by one or many controller managers. + // allow the lock to be released by the handler goroutine and allow this function to continue. + // subsequent events will be locked by their own distinct IDs. + lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) + // store the lock owner ID for deferred action + h.locks[id] = lockOwnerID + if err != nil { + return false, fmt.Errorf("error obtaining the event lock: %v", err) + } + + if !acquired { + logger.V(4).Infof("Event %s is processed by another worker", id) + return false, nil + } + + return true, nil +} + +func (h *LockBasedEventHandler) DeferredAction(ctx context.Context, id string) { + if ownerID, exists := h.locks[id]; exists { + h.lockFactory.Unlock(ctx, ownerID) + delete(h.locks, id) + } +} + +func (h *LockBasedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { + // update the event with the reconciled date + if event != nil { + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { + return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) + } + } + + return nil +} + +// eventHandlerPredicate is a function type for filtering events based on their ID. +type eventHandlerPredicate func(ctx context.Context, eventID string) (bool, error) + +// PredicatedEventHandler is an implementation of EventHandler that filters events using a predicate function. +// - ShouldHandleEvent uses the predicate to determine if the event should be processed by ID. +// - DeferredAction is a no-op as no locking is performed. +// - PostProcess updates the event with the reconciled date and checks if it's processed by all instances. +// If all instances have processed the event, it marks the event as reconciled. +type PredicatedEventHandler struct { + predicate eventHandlerPredicate + events services.EventService + eventInstanceDao dao.EventInstanceDao + instanceDao dao.InstanceDao +} + +func NewPredicatedEventHandler(predicate eventHandlerPredicate, events services.EventService, eventInstanceDao dao.EventInstanceDao, instanceDao dao.InstanceDao) EventHandler { + return &PredicatedEventHandler{ + predicate: predicate, + events: events, + eventInstanceDao: eventInstanceDao, + instanceDao: instanceDao, + } +} + +func (h *PredicatedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { + return h.predicate(ctx, id) +} + +func (h *PredicatedEventHandler) DeferredAction(ctx context.Context, id string) { + // no-op +} + +func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { + // check the event and alive instances + // if the event is handled by all alive instances, mark the event as reconciled + activeInstances, err := h.instanceDao.FindReadyIDs(ctx) + if err != nil { + return fmt.Errorf("error finding ready instances: %v", err) + } + + eventInstances, err := h.eventInstanceDao.GetInstancesByEventID(ctx, event.ID) + if err != nil { + return fmt.Errorf("error finding processed server instances for event %s: %v", event.ID, err) + } + + // check if all instances have processed the event + if !compareStrings(activeInstances, eventInstances) { + klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances) + return fmt.Errorf("event %s is not processed by all instances", event.ID) + } + + // update the event with the reconciled date + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { + return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) + } + + return nil +} + +// compareStrings compares two string slices and returns true if they are equal +func compareStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + + for _, v := range a { + found := false + for _, vv := range b { + if v == vv { + found = true + break + } + } + if !found { + return false + } + } + + return true +} diff --git a/pkg/controllers/event_handler_test.go b/pkg/controllers/event_handler_test.go new file mode 100644 index 00000000..ebcd4a58 --- /dev/null +++ b/pkg/controllers/event_handler_test.go @@ -0,0 +1,151 @@ +package controllers + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao/mocks" + dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" + "github.com/openshift-online/maestro/pkg/services" +) + +func TestLockingEventHandler(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + events := services.NewEventService(eventsDao) + eventHandler := NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + lockingEventHandler, ok := eventHandler.(*LockBasedEventHandler) + Expect(ok).To(BeTrue()) + Expect(lockingEventHandler.locks).To(HaveLen(1)) + + eventHandler.DeferredAction(ctx, "1") + Expect(lockingEventHandler.locks).To(HaveLen(0)) + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + err = eventHandler.PostProcess(ctx, event) + Expect(err).To(BeNil()) + + event, err = eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventHandler.locks).To(HaveLen(1)) + + eventHandler.DeferredAction(ctx, "2") + Expect(lockingEventHandler.locks).To(HaveLen(0)) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventHandler.locks).To(HaveLen(1)) +} + +func TestPredicatedEventHandler(t *testing.T) { + RegisterTestingT(t) + + currentInstanceID := "test-instance" + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + events := services.NewEventService(eventsDao) + eventInstancesDao := mocks.NewEventInstanceDaoMock() + instancesDao := mocks.NewInstanceDao() + eventServer := &exampleEventServer{eventDao: eventsDao} + eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: currentInstanceID}, + Ready: true, + }) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "another-instance"}, + Ready: false, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: "1", + InstanceID: currentInstanceID, + }) + Expect(err).To(BeNil()) + + eventHandler.DeferredAction(ctx, "1") + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + err = eventHandler.PostProcess(ctx, event) + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + event, err = eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + eventHandler.DeferredAction(ctx, "2") + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") + Expect(err).NotTo(BeNil()) + Expect(shouldProcess).To(BeFalse()) +} diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 608c56be..7c69819c 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -6,7 +6,6 @@ import ( "time" "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/db" maestrologger "github.com/openshift-online/maestro/pkg/logger" "github.com/openshift-online/maestro/pkg/services" @@ -45,7 +44,7 @@ var logger = maestrologger.NewOCMLogger(context.Background()) // events sync will help us to handle unexpected errors (e.g. sever restart), it ensures we will not miss any events var defaultEventsSyncPeriod = 10 * time.Hour -type ControllerHandlerFunc func(ctx context.Context, id string) error +type ControllerHandlerFunc func(ctx context.Context, eventID, sourceID string) error type ControllerConfig struct { Source string @@ -53,18 +52,18 @@ type ControllerConfig struct { } type KindControllerManager struct { - controllers map[string]map[api.EventType][]ControllerHandlerFunc - lockFactory db.LockFactory - events services.EventService - eventsQueue workqueue.RateLimitingInterface + controllers map[string]map[api.EventType][]ControllerHandlerFunc + eventHandler EventHandler + events services.EventService + eventsQueue workqueue.RateLimitingInterface } -func NewKindControllerManager(lockFactory db.LockFactory, events services.EventService) *KindControllerManager { +func NewKindControllerManager(eventHandler EventHandler, events services.EventService) *KindControllerManager { return &KindControllerManager{ - controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, - lockFactory: lockFactory, - events: events, - eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), + controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, + eventHandler: eventHandler, + events: events, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), } } @@ -108,35 +107,34 @@ func (km *KindControllerManager) add(source string, ev api.EventType, fns []Cont } func (km *KindControllerManager) handleEvent(id string) error { - ctx := context.Background() - // lock the Event with a fail-fast advisory lock context. - // this allows concurrent processing of many events by one or many controller managers. - // allow the lock to be released by the handler goroutine and allow this function to continue. - // subsequent events will be locked by their own distinct IDs. - lockOwnerID, acquired, err := km.lockFactory.NewNonBlockingLock(ctx, id, db.Events) - // Ensure that the transaction related to this lock always end. - defer km.lockFactory.Unlock(ctx, lockOwnerID) + reqContext := context.WithValue(context.Background(), EventID, id) + + // check if the event should be processed by this instance + shouldProcess, err := km.eventHandler.ShouldHandleEvent(reqContext, id) + defer km.eventHandler.DeferredAction(reqContext, id) if err != nil { - return fmt.Errorf("error obtaining the event lock: %v", err) + return fmt.Errorf("error filtering event with id (%s): %s", id, err) } - if !acquired { - logger.Infof("Event %s is processed by another worker, continue to process the next", id) + // if the event should not be processed by this instance, we can ignore it + if !shouldProcess { + logger.Infof("Event with id (%s) should not be processed by this instance", id) return nil } - reqContext := context.WithValue(ctx, EventID, id) - event, svcErr := km.events.Get(reqContext, id) if svcErr != nil { if svcErr.Is404() { // the event is already deleted, we can ignore it + logger.V(4).Infof("Event with id (%s) is not found", id) return nil } - return fmt.Errorf("error getting event with id(%s): %s", id, svcErr) + return fmt.Errorf("error getting event with id (%s): %s", id, svcErr) } if event.ReconciledDate != nil { + // the event is already reconciled, we can ignore it + logger.V(4).Infof("Event with id (%s) is already reconciled", id) return nil } @@ -153,20 +151,13 @@ func (km *KindControllerManager) handleEvent(id string) error { } for _, fn := range handlerFns { - err := fn(reqContext, event.SourceID) + err := fn(reqContext, id, event.SourceID) if err != nil { - return fmt.Errorf("error handing event %s, %s, %s: %s", event.Source, event.EventType, id, err) + return fmt.Errorf("error handing event %s-%s (%s): %s", event.Source, event.EventType, id, err) } } - // all handlers successfully executed - now := time.Now() - event.ReconciledDate = &now - _, svcErr = km.events.Replace(reqContext, event) - if svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) - } - return nil + return km.eventHandler.PostProcess(reqContext, event) } func (km *KindControllerManager) runWorker() { diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index 3365073b..620674a1 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -6,6 +6,7 @@ import ( . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/dao/mocks" dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" "github.com/openshift-online/maestro/pkg/services" @@ -23,35 +24,53 @@ func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { } type exampleController struct { - addCounter int - updateCounter int - deleteCounter int + instanceID string + eventInstancesDao dao.EventInstanceDao + addCounter int + updateCounter int + deleteCounter int } -func (d *exampleController) OnAdd(ctx context.Context, id string) error { +func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error { d.addCounter++ - return nil + _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: d.instanceID, + }) + return err } -func (d *exampleController) OnUpdate(ctx context.Context, id string) error { +func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error { d.updateCounter++ - return nil + _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: d.instanceID, + }) + return err } -func (d *exampleController) OnDelete(ctx context.Context, id string) error { +func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error { d.deleteCounter++ - return nil + _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: d.instanceID, + }) + return err } -func TestControllerFramework(t *testing.T) { +func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { RegisterTestingT(t) ctx := context.Background() eventsDao := mocks.NewEventDao() events := services.NewEventService(eventsDao) - mgr := NewKindControllerManager(dbmocks.NewMockAdvisoryLockFactory(), events) + eventInstancesDao := mocks.NewEventInstanceDaoMock() + mgr := NewKindControllerManager(NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events), events) - ctrl := &exampleController{} + ctrl := &exampleController{ + instanceID: "instance-1", + eventInstancesDao: eventInstancesDao, + } config := newExampleControllerConfig(ctrl) mgr.Add(config) @@ -87,3 +106,78 @@ func TestControllerFramework(t *testing.T) { eve, _ := eventsDao.Get(ctx, "1") Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") } + +type exampleEventServer struct { + eventDao dao.EventDao +} + +func (e *exampleEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + _, err := e.eventDao.Get(ctx, eventID) + if err != nil { + return false, err + } + return true, nil +} + +func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) { + RegisterTestingT(t) + + currentInstanceID := "test-instance" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + events := services.NewEventService(eventsDao) + eventServer := &exampleEventServer{eventDao: eventsDao} + eventInstancesDao := mocks.NewEventInstanceDaoMock() + instancesDao := mocks.NewInstanceDao() + eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) + mgr := NewKindControllerManager(eventHandler, events) + + ctrl := &exampleController{ + instanceID: currentInstanceID, + eventInstancesDao: eventInstancesDao, + } + config := newExampleControllerConfig(ctrl) + mgr.Add(config) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: currentInstanceID}, + Ready: true, + }) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "another-instance"}, + Ready: false, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: config.Source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: config.Source, + SourceID: "any id", + EventType: api.UpdateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "3"}, + Source: config.Source, + SourceID: "any id", + EventType: api.DeleteEventType, + }) + + mgr.handleEvent("1") + mgr.handleEvent("2") + mgr.handleEvent("3") + + Expect(ctrl.addCounter).To(Equal(1)) + Expect(ctrl.updateCounter).To(Equal(1)) + Expect(ctrl.deleteCounter).To(Equal(1)) + + eve, _ := eventsDao.Get(ctx, "1") + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") +} diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instance.go similarity index 69% rename from pkg/dao/event_instances.go rename to pkg/dao/event_instance.go index c7722b16..3c8b0d40 100644 --- a/pkg/dao/event_instances.go +++ b/pkg/dao/event_instance.go @@ -11,6 +11,7 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) + GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) } @@ -35,6 +36,19 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin return &eventInstance, nil } +func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { + g2 := (*d.sessionFactory).New(ctx) + var eventInstances []api.EventInstance + if err := g2.Model(&api.EventInstance{}).Where("event_id = ?", eventID).Find(&eventInstances).Error; err != nil { + return nil, err + } + instanceIDs := make([]string, len(eventInstances)) + for i, eventInstance := range eventInstances { + instanceIDs[i] = eventInstance.InstanceID + } + return instanceIDs, nil +} + func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { g2 := (*d.sessionFactory).New(ctx) if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index 21e732ca..7f220cea 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -19,6 +19,7 @@ type InstanceDao interface { Delete(ctx context.Context, id string) error DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) + FindReadyIDs(ctx context.Context) ([]string, error) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) All(ctx context.Context) (api.ServerInstanceList, error) } @@ -110,6 +111,19 @@ func (d *sqlInstanceDao) FindByIDs(ctx context.Context, ids []string) (api.Serve return instances, nil } +func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) { + g2 := (*d.sessionFactory).New(ctx) + instances := api.ServerInstanceList{} + if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil { + return nil, err + } + ids := make([]string, len(instances)) + for i, instance := range instances { + ids[i] = instance.ID + } + return ids, nil +} + func (d *sqlInstanceDao) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) instances := api.ServerInstanceList{} diff --git a/pkg/dao/mocks/event.go b/pkg/dao/mocks/event.go index 738e110f..60833a0d 100755 --- a/pkg/dao/mocks/event.go +++ b/pkg/dao/mocks/event.go @@ -35,7 +35,13 @@ func (d *eventDaoMock) Create(ctx context.Context, event *api.Event) (*api.Event } func (d *eventDaoMock) Replace(ctx context.Context, event *api.Event) (*api.Event, error) { - return nil, errors.NotImplemented("Event").AsError() + for i, e := range d.events { + if e.ID == event.ID { + d.events[i] = event + return event, nil + } + } + return nil, gorm.ErrRecordNotFound } func (d *eventDaoMock) Delete(ctx context.Context, id string) error { diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go new file mode 100644 index 00000000..34a1468a --- /dev/null +++ b/pkg/dao/mocks/event_instance.go @@ -0,0 +1,57 @@ +package mocks + +import ( + "context" + "fmt" + "sync" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" +) + +var _ dao.EventInstanceDao = &eventInstanceDaoMock{} + +type eventInstanceDaoMock struct { + mux sync.RWMutex + eventInstances api.EventInstanceList +} + +func NewEventInstanceDaoMock() *eventInstanceDaoMock { + return &eventInstanceDaoMock{} +} + +func (d *eventInstanceDaoMock) Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + for _, ei := range d.eventInstances { + if ei.EventID == eventID && ei.InstanceID == instanceID { + return ei, nil + } + } + + return nil, fmt.Errorf("event instance not found") +} + +func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + var instanceIDs []string + for _, ei := range d.eventInstances { + if ei.EventID == eventID { + instanceIDs = append(instanceIDs, ei.InstanceID) + } + } + + return instanceIDs, nil +} + +func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + d.mux.Lock() + defer d.mux.Unlock() + + d.eventInstances = append(d.eventInstances, eventInstance) + + return eventInstance, nil +} diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index b872b337..776fb763 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -124,6 +124,18 @@ func (d *instanceDaoMock) FindByIDs(ctx context.Context, ids []string) (api.Serv return nil, errors.NotImplemented("Instance").AsError() } +func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + ids := make([]string, 0, len(d.instances)) + for _, instance := range d.instances { + if instance.Ready { + ids = append(ids, instance.ID) + } + } + return ids, nil +} + func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() diff --git a/test/helper.go b/test/helper.go index ef374d39..3c1f1719 100755 --- a/test/helper.go +++ b/test/helper.go @@ -233,7 +233,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()), helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index a6ce56cc..541242ab 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -32,13 +32,13 @@ func TestControllerRacing(t *testing.T) { // the event with create type. Due to the event lock, each create event // should be only processed once. var proccessedEvent, processedStatusEvent []string - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { events, err := eventDao.All(ctx) if err != nil { return err } for _, evt := range events { - if evt.SourceID != id { + if evt.SourceID != resourceID { continue } if evt.EventType != api.CreateEventType { @@ -48,7 +48,7 @@ func TestControllerRacing(t *testing.T) { if evt.ReconciledDate != nil { continue } - proccessedEvent = append(proccessedEvent, id) + proccessedEvent = append(proccessedEvent, resourceID) } return nil @@ -80,7 +80,7 @@ func TestControllerRacing(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -151,7 +151,7 @@ func TestControllerReconcile(t *testing.T) { processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, // and then, the controller will requeue this event, at that time, we handle this event successfully. - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { processedEventTimes = processedEventTimes + 1 if processedEventTimes == 1 { return fmt.Errorf("failed to process the event") @@ -176,7 +176,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -295,9 +295,9 @@ func TestControllerSync(t *testing.T) { } var proccessedEvents []string - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { // we just record the processed event - proccessedEvents = append(proccessedEvents, id) + proccessedEvents = append(proccessedEvents, resourceID) return nil } @@ -307,7 +307,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), h.Env().Services.Events(), ), StatusController: controllers.NewStatusController(