Skip to content

Commit

Permalink
Merge pull request #17 from nais/soft-delete
Browse files Browse the repository at this point in the history
Refactor to handle changes implemented by …
  • Loading branch information
christeredvartsen authored Jul 2, 2024
2 parents 3fcb630 + 953f322 commit 964c02e
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 69 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/google/uuid v1.6.0
github.com/grafana/grafana-openapi-client-go v0.0.0-20240430202104-3ad0f7e4ee52
github.com/joho/godotenv v1.5.1
github.com/nais/api v0.0.0-20240614093353-fe1f73775e6a
github.com/nais/api v0.0.0-20240702091139-fe19bb3460df
github.com/nais/dependencytrack v0.0.0-20240610080458-c76185696c11
github.com/prometheus/client_golang v1.19.1
github.com/sethvargo/go-envconfig v1.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/nais/api v0.0.0-20240614093353-fe1f73775e6a h1:tL5JOFS+gUWimotAAcH6zygtb3Ef1C6hrq2WvGk3l4c=
github.com/nais/api v0.0.0-20240614093353-fe1f73775e6a/go.mod h1:X0Jp7pnYDRxKWfi46CJxyPc0f09uAaSh3d4idCTsLtY=
github.com/nais/api v0.0.0-20240702091139-fe19bb3460df h1:SUv7kJfxqMgK3/O4jE4XeCDimsxtQ1m3f92T0uVnE+s=
github.com/nais/api v0.0.0-20240702091139-fe19bb3460df/go.mod h1:BBRdhHlB5TK0cAShs650TvA7PVh9LUU9V1qgxinZ4EU=
github.com/nais/dependencytrack v0.0.0-20240610080458-c76185696c11 h1:2Cy7cjTM7PyiaUi3+aLsRSg6T0R6AqlduPNmT9LBMSY=
github.com/nais/dependencytrack v0.0.0-20240610080458-c76185696c11/go.mod h1:BdPp1+tyE1hSMZigr7aGmhcTqaksf9Pxxbo4ZAi4On8=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
111 changes: 54 additions & 57 deletions internal/reconcilers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reconcilers
import (
"context"
"errors"
"fmt"
"slices"
"time"

Expand All @@ -23,19 +24,18 @@ import (
type ctxKey int

const (
ctxCorrelationID ctxKey = iota
reconcilerTimeout = time.Minute * 15
ctxCorrelationID ctxKey = iota
)

const reconcilerTimeout = time.Minute * 15

type Manager struct {
apiclient *apiclient.APIClient
reconcilers []Reconciler
// Reconcilers to enable during registration
reconcilersToEnable []string
log logrus.FieldLogger
pubsubSubscription *pubsub.Subscription
syncQueueChan <-chan Input
syncQueueChan <-chan ReconcileRequest
syncQueue Queue
inFlight InFlight

Expand Down Expand Up @@ -126,17 +126,18 @@ func (m *Manager) ListenForEvents(ctx context.Context) {
switch event {
case protoapi.EventTypes_EVENT_TEAM_DELETED,
protoapi.EventTypes_EVENT_TEAM_UPDATED:
input := Input{
CorrelationID: correlationID.String(),
}

var obj interface {
proto.Message
GetSlug() string
}

input := ReconcileRequest{
CorrelationID: correlationID.String(),
}

if event == protoapi.EventTypes_EVENT_TEAM_DELETED {
obj = &protoapi.EventTeamDeleted{}
input.Delete = true
} else {
obj = &protoapi.EventTeamUpdated{}
}
Expand All @@ -147,8 +148,7 @@ func (m *Manager) ListenForEvents(ctx context.Context) {
}

input.TeamSlug = obj.GetSlug()
err := m.syncQueue.Add(input)
if err != nil {
if err := m.syncQueue.Add(input); err != nil {
msg.Nack()
m.log.WithError(err).Error("error while adding team to queue")
return
Expand Down Expand Up @@ -218,37 +218,35 @@ func (m *Manager) Close() {

// syncTeam will mark the team as "in flight" and start the reconciliation process. If the team is already in flight, it
// will be added to the back of the queue. Based on the input, the team will either be deleted or reconciled.
func (m *Manager) syncTeam(ctx context.Context, input Input) {
log := m.log.WithField("team", input.TeamSlug)
func (m *Manager) syncTeam(ctx context.Context, req ReconcileRequest) {
log := m.log.WithField("team", req.TeamSlug)

if !m.inFlight.Set(input.TeamSlug) {
if !m.inFlight.Set(req.TeamSlug) {
log.Info("already in flight - adding to back of queue")
time.Sleep(10 * time.Second)
if err := m.syncQueue.Add(input); err != nil {
if err := m.syncQueue.Add(req); err != nil {
log.WithError(err).Error("failed while re-queueing team that is in flight")
}
return
}

defer m.inFlight.Remove(input.TeamSlug)
defer m.inFlight.Remove(req.TeamSlug)

resp, err := m.apiclient.Teams().Get(ctx, &protoapi.GetTeamRequest{Slug: input.TeamSlug})
resp, err := m.apiclient.Teams().Get(ctx, &protoapi.GetTeamRequest{Slug: req.TeamSlug})
if err != nil {
if status.Code(err) == codes.NotFound {
log.Info("team not found, team will not be requeued")
return
}

log.WithError(err).Error("error while getting team")
if err := m.syncQueue.Add(input); err != nil {
log.WithError(err).Error("failed while re-queueing team that is in flight")
log.WithError(err).Error("error while getting team, requeuing")
if err := m.syncQueue.Add(req); err != nil {
log.WithError(err).Error("failed while requeuing team")
}

return
}

team := resp.Team

ctx, cancel := context.WithTimeout(ctx, reconcilerTimeout)
defer cancel()

Expand All @@ -258,10 +256,11 @@ func (m *Manager) syncTeam(ctx context.Context, input Input) {
return
}

if input.Delete {
m.deleteTeam(ctx, reconcilers, team, input)
team := resp.Team
if team.DeleteKeyConfirmedAt != nil {
m.deleteTeam(ctx, reconcilers, team, req)
} else {
m.reconcileTeam(ctx, reconcilers, team, input)
m.reconcileTeam(ctx, reconcilers, team, req)
}
}

Expand All @@ -286,17 +285,17 @@ func (m *Manager) enabledReconcilers(ctx context.Context) ([]Reconciler, error)

// deleteTeam will pass the team through to all enabled reconcilers, effectively deleting the team from all configured
// external systems.
func (m *Manager) deleteTeam(ctx context.Context, reconcilers []Reconciler, naisTeam *protoapi.Team, input Input) {
func (m *Manager) deleteTeam(ctx context.Context, reconcilers []Reconciler, naisTeam *protoapi.Team, req ReconcileRequest) {
teamStart := time.Now()
log := m.log.WithField("team", input.TeamSlug)
log := m.log.WithField("team", req.TeamSlug)

if input.CorrelationID != "" {
log = log.WithField("correlation_id", input.CorrelationID)
ctx = context.WithValue(ctx, ctxCorrelationID, input.CorrelationID)
if req.CorrelationID != "" {
log = log.WithField("correlation_id", req.CorrelationID)
ctx = context.WithValue(ctx, ctxCorrelationID, req.CorrelationID)
}

if input.TraceID != "" {
log = log.WithField("trace_id", input.TraceID)
if req.TraceID != "" {
log = log.WithField("trace_id", req.TraceID)
}

log.WithField("time", teamStart).Debugf("start team deletion process")
Expand All @@ -318,18 +317,18 @@ func (m *Manager) deleteTeam(ctx context.Context, reconcilers []Reconciler, nais
log.WithError(err).Errorf("error during team deletion")

req := &protoapi.SetReconcilerErrorForTeamRequest{
CorrelationId: input.CorrelationID,
CorrelationId: req.CorrelationID,
ReconcilerName: r.Name(),
ErrorMessage: err.Error(),
TeamSlug: input.TeamSlug,
TeamSlug: req.TeamSlug,
}
if _, err := m.apiclient.Reconcilers().SetReconcilerErrorForTeam(ctx, req); err != nil {
log.WithError(err).Errorf("error while adding deletion error")
}
} else {
req := &protoapi.RemoveReconcilerErrorForTeamRequest{
ReconcilerName: r.Name(),
TeamSlug: input.TeamSlug,
TeamSlug: req.TeamSlug,
}
if _, err := m.apiclient.Reconcilers().RemoveReconcilerErrorForTeam(ctx, req); err != nil {
log.WithError(err).Errorf("error while removing deletion error")
Expand All @@ -354,7 +353,7 @@ func (m *Manager) deleteTeam(ctx context.Context, reconcilers []Reconciler, nais

if successfulDelete {
req := &protoapi.DeleteTeamRequest{
Slug: input.TeamSlug,
Slug: req.TeamSlug,
}
if _, err := m.apiclient.Teams().Delete(ctx, req); err != nil {
log.WithError(err).Errorf("error while deleting team")
Expand All @@ -377,7 +376,7 @@ func (m *Manager) deleteTeam(ctx context.Context, reconcilers []Reconciler, nais

// reconcileTeam will pass the team through to all enabled reconcilers, effectively synchronizing the team to all
// configured external systems.
func (m *Manager) reconcileTeam(ctx context.Context, reconcilers []Reconciler, naisTeam *protoapi.Team, input Input) {
func (m *Manager) reconcileTeam(ctx context.Context, reconcilers []Reconciler, naisTeam *protoapi.Team, input ReconcileRequest) {
teamStart := time.Now()
log := m.log.WithField("team", input.TeamSlug)

Expand Down Expand Up @@ -465,37 +464,35 @@ func (m *Manager) scheduleAllTeams(ctx context.Context, correlationID uuid.UUID)
return nil
}

teams, err := getTeams(ctx, m.apiclient.Teams())
if err != nil {
return err
}

m.log.WithField("num_teams", len(teams)).Debugf("fetched teams from API")

for _, team := range teams {
err := m.syncQueue.Add(Input{
CorrelationID: correlationID.String(),
TeamSlug: team.Slug,
})
if err != nil {
m.log.WithField("team", team.Slug).WithError(err).Errorf("error while adding team to queue")
}
if err := m.scheduleTeams(ctx, correlationID); err != nil {
m.log.WithError(err).Error("error while scheduling teams for reconciliation")
}

return nil
}

// getTeams retrieves all teams from the NAIS API
func getTeams(ctx context.Context, client protoapi.TeamsClient) ([]*protoapi.Team, error) {
func (m *Manager) scheduleTeams(ctx context.Context, correlationID uuid.UUID) error {
it := iterator.New(ctx, 100, func(limit, offset int64) (*protoapi.ListTeamsResponse, error) {
return client.List(ctx, &protoapi.ListTeamsRequest{Limit: limit, Offset: offset})
return m.apiclient.Teams().List(ctx, &protoapi.ListTeamsRequest{Limit: limit, Offset: offset})
})

teams := make([]*protoapi.Team, 0)
num := 0
for it.Next() {
teams = append(teams, it.Value())
err := m.syncQueue.Add(ReconcileRequest{
CorrelationID: correlationID.String(),
TeamSlug: it.Value().Slug,
})
if err != nil {
return fmt.Errorf("error while adding team to queue: %w", err)
}
num++
}
if err := it.Err(); err != nil {
return fmt.Errorf("error while fetching active teams for reconciliation: %w", err)
}
return teams, it.Err()

m.log.WithField("num_teams", num).Debugf("added teams to reconcile queue")
return nil
}

// getReconcilers retrieves all reconcilers from the NAIS API
Expand Down
15 changes: 7 additions & 8 deletions internal/reconcilers/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,40 @@ import (

const reconcilerQueueSize = 4096

type Input struct {
type ReconcileRequest struct {
CorrelationID string
TraceID string
TeamSlug string
Delete bool
}

type Queue interface {
Add(Input) error
Add(ReconcileRequest) error
Close()
}

type queue struct {
queue chan Input
queue chan ReconcileRequest
closed bool
lock sync.Mutex
}

func NewQueue() (Queue, <-chan Input) {
ch := make(chan Input, reconcilerQueueSize)
func NewQueue() (Queue, <-chan ReconcileRequest) {
ch := make(chan ReconcileRequest, reconcilerQueueSize)
return &queue{
queue: ch,
closed: false,
}, ch
}

func (q *queue) Add(input Input) error {
func (q *queue) Add(req ReconcileRequest) error {
q.lock.Lock()
defer q.lock.Unlock()

if q.closed {
return fmt.Errorf("team reconciler channel is closed")
}

q.queue <- input
q.queue <- req
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/reconcilers/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func Test_Queue(t *testing.T) {
input := reconcilers.Input{
input := reconcilers.ReconcileRequest{
TeamSlug: "test-team",
CorrelationID: uuid.New().String(),
}
Expand Down

0 comments on commit 964c02e

Please sign in to comment.