Skip to content

Commit

Permalink
refactor: First code for a more generic informer (#42)
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis authored Mar 21, 2024
1 parent 824c104 commit e135aa8
Show file tree
Hide file tree
Showing 15 changed files with 933 additions and 116 deletions.
4 changes: 2 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Agent struct {
appManager *application.ApplicationManager
mode types.AgentMode
queues *queue.SendRecvQueues
emitter *event.Event
emitter *event.EventSource
watchLock sync.RWMutex
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func (a *Agent) Start(ctx context.Context) error {
_ = a.maintainConnection()
}

a.emitter = event.NewEventEmitter(fmt.Sprintf("agent://%s", "agent-managed"))
a.emitter = event.NewEventSource(fmt.Sprintf("agent://%s", "agent-managed"))

// Wait for the informer to be synced
err := a.informer.EnsureSynced(waitForSyncedDuration)
Expand Down
42 changes: 23 additions & 19 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,35 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
return nil
}
}
ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event)
ev, err := event.FromWire(rcvd.Event)
if err != nil {
logCtx.Errorf("Could not unwrap event: %v", err)
return nil
}
logCtx.Debugf("Received a new event from stream")
switch ev.Type() {
case event.ApplicationCreated:
_, err := a.createApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error creating application: %v", err)
}
case event.ApplicationSpecUpdated:
_, err = a.updateApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error updating application: %v", err)
}
case event.ApplicationDeleted:
err = a.deleteApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
err = a.processIncomingEvent(ev)
if err != nil {
logCtx.WithError(err).Errorf("Unable to process incoming event")
}
// switch ev.Type() {
// case event.Create:
// _, err := a.createApplication(incomingApp)
// if err != nil {
// logCtx.Errorf("Error creating application: %v", err)
// }
// case event.SpecUpdate:
// _, err = a.updateApplication(incomingApp)
// if err != nil {
// logCtx.Errorf("Error updating application: %v", err)
// }
// case event.Delete:
// err = a.deleteApplication(incomingApp)
// if err != nil {
// logCtx.Errorf("Error deleting application: %v", err)
// }
// default:
// logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
// }

return nil
}
Expand Down
46 changes: 46 additions & 0 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/jannfis/argocd-agent/internal/event"
"github.com/jannfis/argocd-agent/pkg/types"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,6 +17,51 @@ Inbound events are those coming through our gRPC interface, e.g. those that
were received from a server.
*/

func (a *Agent) processIncomingEvent(ev *event.Event) error {
var err error
switch ev.Target() {
case event.TargetApplication:
err = a.processIncomingApplication(ev)
case event.TargetAppProject:
default:
err = fmt.Errorf("unknown event target: %s", ev.Target())
}

return err
}

func (a *Agent) processIncomingApplication(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
"method": "processIncomingEvents",
})
incomingApp, err := ev.Application()
if err != nil {
return err
}

switch ev.Type() {
case event.Create:
_, err = a.createApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error creating application: %v", err)
}
case event.SpecUpdate:
_, err = a.updateApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error updating application: %v", err)
}
case event.Delete:
err = a.deleteApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
}

return err
}

// createApplication creates an Application upon an event in the agent's work
// queue.
func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.Application, error) {
Expand Down
10 changes: 5 additions & 5 deletions agent/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (a *Agent) addAppCreationToQueue(app *v1alpha1.Application) {
return
}

q.Add(a.emitter.NewApplicationEvent(event.ApplicationCreated, app))
q.Add(a.emitter.NewApplicationEvent(event.Create, app))
logCtx.WithField("sendq_len", q.Len()).WithField("sendq_name", a.remote.ClientID()).Debugf("Added app create event to send queue")
}

Expand Down Expand Up @@ -92,12 +92,12 @@ func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.App

// Depending on what mode the agent operates in, we sent a different type
// of event.
eventType := ""
var eventType event.EventType
switch a.mode {
case types.AgentModeAutonomous:
eventType = event.ApplicationStatusUpdate
eventType = event.StatusUpdate
case types.AgentModeManaged:
eventType = event.ApplicationSpecUpdated
eventType = event.SpecUpdate
}

q.Add(a.emitter.NewApplicationEvent(eventType, new))
Expand Down Expand Up @@ -130,6 +130,6 @@ func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) {
logCtx.Error("Default queue disappeared!")
return
}
q.Add(a.emitter.NewApplicationEvent(event.ApplicationDeleted, app))
q.Add(a.emitter.NewApplicationEvent(event.Delete, app))
logCtx.WithField("sendq_len", q.Len()).Debugf("Added app delete event to send queue")
}
143 changes: 78 additions & 65 deletions internal/event/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package event

import (
"fmt"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"

_ "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
Expand All @@ -9,96 +11,107 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
)

type EventType int32
const cloudEventSpecVersion = "1.0"

const (
EventUnknown EventType = iota
// EventPing is an empty event request to check whether the connection is still intact
EventPing
// EventAppAdded is an event to let the peer know about a new application
EventAppAdded
// EventAppDeleted is an event to let the peer know about the deletion of an app
EventAppDeleted
// EventAppSpecUpdated is an event to update an application's spec field
EventAppSpecUpdated
// EventAppStatusUpdated is an event to update an application's status field
EventAppStatusUpdated
// EventAppOperationUpdated is an event to update an application's operation field
EventAppOperationUpdated
)
type EventType string
type EventTarget string

const cloudEventSpecVersion = "1.0"
const TypePrefix = "io.argoproj.argocd-agent.event"

const TypePrefix = "io.argoproj.argocd-agent"
const (
Ping EventType = TypePrefix + ".ping"
Pong EventType = TypePrefix + ".pong"
Create EventType = TypePrefix + ".create"
Delete EventType = TypePrefix + ".delete"
Update EventType = TypePrefix + ".update"
SpecUpdate EventType = TypePrefix + ".spec-update"
StatusUpdate EventType = TypePrefix + ".status-update"
OperationUpdate EventType = TypePrefix + ".operation-update"
)

const (
Ping = TypePrefix + ".ping"
Pong = TypePrefix + ".pong"
ApplicationCreated = TypePrefix + ".application.create"
ApplicationDeleted = TypePrefix + ".application.delete"
ApplicationSpecUpdated = TypePrefix + ".application.spec-update"
ApplicationStatusUpdate = TypePrefix + ".application.status-update"
ApplicationOperationUpdate = TypePrefix + ".application.operation-update"
TargetUnknown EventTarget = "unknown"
TargetApplication EventTarget = "application"
TargetAppProject EventTarget = "appproject"
)

// type LegacyEvent struct {
// Type EventType
// Application *v1alpha1.Application
// AppProject *v1alpha1.AppProject // Forward compatibility
// ApplicationSet *v1alpha1.ApplicationSet // Forward compatibility
// Created *time.Time
// Processed *time.Time
// Event cloudevents.Event
// }
func (t EventType) String() string {
return string(t)
}

type Event struct {
func (t EventTarget) String() string {
return string(t)
}

type EventSource struct {
source string
}

func (et EventType) String() string {
switch et {
case EventUnknown:
return "unknown"
case EventAppAdded:
return "add"
case EventAppDeleted:
return "delete"
case EventAppSpecUpdated:
return "update_spec"
case EventAppOperationUpdated:
return "update_operation"
case EventAppStatusUpdated:
return "update_status"
default:
return "unknown"
}
type Event struct {
event *cloudevents.Event
target EventTarget
}

func NewEventEmitter(source string) *Event {
ev := &Event{}
func NewEventSource(source string) *EventSource {
ev := &EventSource{}
ev.source = source
return ev
}

func (ev Event) NewApplicationEvent(evType string, app *v1alpha1.Application) *cloudevents.Event {
func (evs EventSource) NewApplicationEvent(evType EventType, app *v1alpha1.Application) *cloudevents.Event {
cev := cloudevents.NewEvent()
cev.SetSource(ev.source)
cev.SetSource(evs.source)
cev.SetSpecVersion(cloudEventSpecVersion)
cev.SetType(evType)
cev.SetType(evType.String())
cev.SetDataSchema(TargetApplication.String())
// TODO: Handle this error situation?
_ = cev.SetData(cloudevents.ApplicationJSON, app)
return &cev
}

func ApplicationFromWire(pev *pb.CloudEvent) (*cloudevents.Event, *v1alpha1.Application, error) {
app := v1alpha1.Application{}
ev, err := format.FromProto(pev)
// FromWire validates an event from the wire in protobuf format, converts it
// into an Event object and returns it. If the event on the wire is invalid,
// or could not be converted for another reason, FromWire returns an error.
func FromWire(pev *pb.CloudEvent) (*Event, error) {
raw, err := format.FromProto(pev)
if err != nil {
return nil, nil, err
return nil, err
}
err = ev.DataAs(&app)
if err != nil {
return nil, nil, err
ev := &Event{}
var target EventTarget
if ev.target = Target(raw); ev.target == "" {
return nil, fmt.Errorf("unknown event target: %s", target)
}
return ev, &app, nil
ev.event = raw
return ev, nil
}

func Target(raw *cloudevents.Event) EventTarget {
switch raw.DataSchema() {
case TargetApplication.String():
return TargetApplication
case TargetAppProject.String():
return TargetAppProject
}
return ""
}

func (ev Event) Target() EventTarget {
return ev.target
}

func (ev Event) Type() EventType {
return EventType(ev.event.Type())
}

func (ev Event) Application() (*v1alpha1.Application, error) {
app := &v1alpha1.Application{}
err := ev.event.DataAs(app)
return app, err
}

func (ev Event) AppProject() (*v1alpha1.AppProject, error) {
proj := &v1alpha1.AppProject{}
err := ev.event.DataAs(proj)
return proj, err
}
1 change: 1 addition & 0 deletions internal/event/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package event
Loading

0 comments on commit e135aa8

Please sign in to comment.