Skip to content

Commit

Permalink
Revert "chore(workflows): stub out event handlers (#15313)"
Browse files Browse the repository at this point in the history
This reverts commit 86ab654.
  • Loading branch information
justinkaseman committed Nov 21, 2024
1 parent 2c79077 commit 93b05ee
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ func Test_SecretsWorker(t *testing.T) {
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
nil,
nil,
syncer.WithTicker(giveTicker.C),
)

Expand Down
64 changes: 0 additions & 64 deletions core/services/workflows/syncer/engine_registry.go

This file was deleted.

141 changes: 6 additions & 135 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,170 +3,41 @@ package syncer
import (
"context"
"encoding/hex"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

var ErrNotImplemented = errors.New("not implemented")

// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry
type WorkflowRegistryEventType string

var (
// ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made
ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1"

// WorkflowRegisteredEvent is emitted when a workflow is registered
WorkflowRegisteredEvent WorkflowRegistryEventType = "WorkflowRegisteredV1"

// WorkflowUpdatedEvent is emitted when a workflow is updated
WorkflowUpdatedEvent WorkflowRegistryEventType = "WorkflowUpdatedV1"

// WorkflowPausedEvent is emitted when a workflow is paused
WorkflowPausedEvent WorkflowRegistryEventType = "WorkflowPausedV1"

// WorkflowActivatedEvent is emitted when a workflow is activated
WorkflowActivatedEvent WorkflowRegistryEventType = "WorkflowActivatedV1"

// WorkflowDeletedEvent is emitted when a workflow is deleted
WorkflowDeletedEvent WorkflowRegistryEventType = "WorkflowDeletedV1"
)

// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry
// ForceUpdateSecretsRequested event.
type WorkflowRegistryForceUpdateSecretsRequestedV1 struct {
SecretsURLHash []byte
Owner []byte
WorkflowName string
}

type WorkflowRegistryWorkflowRegisteredV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
Status uint8
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowUpdatedV1 struct {
OldWorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
NewWorkflowID [32]byte
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowPausedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

type WorkflowRegistryWorkflowActivatedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
WorkflowName string
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm WorkflowSecretsDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
lggr logger.Logger
orm ORM
fetcher FetcherFunc
}

// newEventHandler returns a new eventHandler instance.
func newEventHandler(
lggr logger.Logger,
orm ORM,
gateway FetcherFunc,
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
engineRegistry *engineRegistry,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
lggr: lggr,
orm: orm,
fetcher: gateway,
}
}

func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error {
switch event.EventType {
case ForceUpdateSecretsEvent:
return h.forceUpdateSecretsEvent(ctx, event)
case WorkflowRegisteredEvent:
return h.workflowRegisteredEvent(ctx, event)
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
}

// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type.
// TODO: Implement this method
func (h *eventHandler) workflowRegisteredEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
) error {
return ErrNotImplemented
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
Expand Down
10 changes: 5 additions & 5 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
h := newEventHandler(lggr, mockORM, fetcher)
err = h.Handle(ctx, giveEvent)
require.NoError(t, err)
})
Expand All @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) {
return []byte("contents"), nil
}

h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
h := newEventHandler(lggr, mockORM, fetcher)
err := h.Handle(ctx, giveEvent)
require.Error(t, err)
require.Contains(t, err.Error(), "event type unsupported")
Expand All @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) {
t.Run("fails to get secrets url", func(t *testing.T) {
mockORM := mocks.NewORM(t)
ctx := testutils.Context(t)
h := newEventHandler(lggr, mockORM, nil, nil, nil, nil)
h := newEventHandler(lggr, mockORM, nil)
giveURL := "https://original-url.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) {
return nil, assert.AnError
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
h := newEventHandler(lggr, mockORM, fetcher)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand All @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) {
}
mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil)
mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError)
h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil)
h := newEventHandler(lggr, mockORM, fetcher)
err = h.Handle(ctx, giveEvent)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
Expand Down
58 changes: 0 additions & 58 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 93b05ee

Please sign in to comment.