Skip to content

Commit

Permalink
CLOUDP-284792: Remove services from reconcilers (#2025)
Browse files Browse the repository at this point in the history
* CLOUDP-284792: Remove services from reconcilers

* Remove ResourceWithCredentials

* Rename searchIndexReconciler as searchIndexReconcileRequest

* Rename searchIndexesReconciler as searchIndexesReconcileRequest
  • Loading branch information
josvazg authored Jan 13, 2025
1 parent 49372c5 commit 15aa149
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 141 deletions.
22 changes: 11 additions & 11 deletions internal/controller/atlasdeployment/searchindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
IndexStatusActive = "READY"
)

type searchIndexReconciler struct {
type searchIndexReconcileRequest struct {
ctx *workflow.Context
deployment *akov2.AtlasDeployment
k8sClient client.Client
Expand All @@ -26,7 +26,7 @@ type searchIndexReconciler struct {
searchService searchindex.AtlasSearchIdxService
}

func (sr *searchIndexReconciler) Reconcile(spec *akov2.SearchIndex, previous *status.DeploymentSearchIndexStatus) workflow.Result {
func (sr *searchIndexReconcileRequest) Handle(spec *akov2.SearchIndex, previous *status.DeploymentSearchIndexStatus) workflow.Result {
var stateInAtlas, stateInAKO *searchindex.SearchIndex
name := ""

Expand Down Expand Up @@ -79,7 +79,7 @@ func (sr *searchIndexReconciler) Reconcile(spec *akov2.SearchIndex, previous *st
return sr.reconcileInternal(name, stateInAKO, stateInAtlas)
}

func (sr *searchIndexReconciler) reconcileInternal(indexName string, stateInAKO, stateInAtlas *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) reconcileInternal(indexName string, stateInAKO, stateInAtlas *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("starting reconciliation for index '%s'", sr.indexName)
defer sr.ctx.Log.Debugf("finished reconciliation for index '%s'", sr.indexName)

Expand Down Expand Up @@ -107,7 +107,7 @@ func (sr *searchIndexReconciler) reconcileInternal(indexName string, stateInAKO,
}
}

func (sr *searchIndexReconciler) idle(index *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) idle(index *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("[idle] index '%s'", index.Name)
msg := fmt.Sprintf("Atlas search index status: %s", index.GetStatus())
sr.ctx.EnsureStatusOption(status.AtlasDeploymentSetSearchIndexStatus(status.NewDeploymentSearchIndexStatus(
Expand All @@ -121,7 +121,7 @@ func (sr *searchIndexReconciler) idle(index *searchindex.SearchIndex) workflow.R
}

// Never set the ID (status.WithID()) on terminate. It may be empty and the AKO will lose track on this index
func (sr *searchIndexReconciler) terminate(index *searchindex.SearchIndex, err error) workflow.Result {
func (sr *searchIndexReconcileRequest) terminate(index *searchindex.SearchIndex, err error) workflow.Result {
msg := fmt.Errorf("error with processing index %s. err: %w", index.Name, err)
sr.ctx.Log.Debug(msg)
sr.ctx.EnsureStatusOption(status.AtlasDeploymentSetSearchIndexStatus(status.NewDeploymentSearchIndexStatus(
Expand All @@ -134,7 +134,7 @@ func (sr *searchIndexReconciler) terminate(index *searchindex.SearchIndex, err e
return terminate
}

func (sr *searchIndexReconciler) create(index *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) create(index *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("[creating] index %s", index.Name)
defer sr.ctx.Log.Debugf("[creation finished] for index %s", index.Name)
akoIdx, err := sr.searchService.CreateIndex(sr.ctx.Context, sr.projectID, sr.deployment.GetDeploymentName(), index)
Expand All @@ -144,7 +144,7 @@ func (sr *searchIndexReconciler) create(index *searchindex.SearchIndex) workflow
return sr.progress(akoIdx)
}

func (sr *searchIndexReconciler) progress(index *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) progress(index *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("index %s is progress: %s", index.Name, index.GetStatus())
msg := fmt.Sprintf("Atlas search index status: %s", index.GetStatus())
sr.ctx.EnsureStatusOption(status.AtlasDeploymentSetSearchIndexStatus(
Expand All @@ -157,7 +157,7 @@ func (sr *searchIndexReconciler) progress(index *searchindex.SearchIndex) workfl
return inProgress
}

func (sr *searchIndexReconciler) delete(index *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) delete(index *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("[deleting] index %s", index.Name)
defer sr.ctx.Log.Debugf("[deletion finished] for index %s", index.Name)
if index.ID == nil {
Expand All @@ -172,13 +172,13 @@ func (sr *searchIndexReconciler) delete(index *searchindex.SearchIndex) workflow
return sr.deleted(index.Name)
}

func (sr *searchIndexReconciler) deleted(indexName string) workflow.Result {
func (sr *searchIndexReconcileRequest) deleted(indexName string) workflow.Result {
sr.ctx.EnsureStatusOption(status.AtlasDeploymentUnsetSearchIndexStatus(status.NewDeploymentSearchIndexStatus("",
status.WithName(indexName))))
return workflow.Deleted()
}

func (sr *searchIndexReconciler) compare(akoIdx, atlasIdx *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) compare(akoIdx, atlasIdx *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("[syncing] index %s", akoIdx.Name)
defer sr.ctx.Log.Debugf("[update finished] for index %s", akoIdx.Name)

Expand All @@ -193,7 +193,7 @@ func (sr *searchIndexReconciler) compare(akoIdx, atlasIdx *searchindex.SearchInd
return sr.update(akoIdx, atlasIdx)
}

func (sr *searchIndexReconciler) update(akoIdx, atlasIdx *searchindex.SearchIndex) workflow.Result {
func (sr *searchIndexReconcileRequest) update(akoIdx, atlasIdx *searchindex.SearchIndex) workflow.Result {
sr.ctx.Log.Debugf("updating index %s...", akoIdx.Name)
convertedIdx, err := sr.searchService.UpdateIndex(sr.ctx.Context, sr.projectID, sr.deployment.GetDeploymentName(), akoIdx)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions internal/controller/atlasdeployment/searchindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/searchindex/fake"
)

func Test_searchIndexReconciler(t *testing.T) {
func Test_searchIndexReconcileRequest(t *testing.T) {
t.Run("create: must reconcile index to create", func(t *testing.T) {
sch := runtime.NewScheme()
assert.Nil(t, akov2.AddToScheme(sch))
Expand Down Expand Up @@ -67,7 +67,7 @@ func Test_searchIndexReconciler(t *testing.T) {
},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -125,7 +125,7 @@ func Test_searchIndexReconciler(t *testing.T) {
},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -182,7 +182,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Return(nil, &http.Response{StatusCode: http.StatusOK}, nil)
atlasSearch := searchindex.NewSearchIndexes(mockSearchAPI)

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -233,7 +233,7 @@ func Test_searchIndexReconciler(t *testing.T) {
mockSearchAPI := mockadmin.NewAtlasSearchApi(t)
atlasSearch := searchindex.NewSearchIndexes(mockSearchAPI)

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -289,7 +289,7 @@ func Test_searchIndexReconciler(t *testing.T) {
},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -347,7 +347,7 @@ func Test_searchIndexReconciler(t *testing.T) {
},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -398,7 +398,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Status: status.AtlasDeploymentStatus{},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand All @@ -415,7 +415,7 @@ func Test_searchIndexReconciler(t *testing.T) {
})

t.Run("must return InProgress if index status is anything but ACTIVE", func(t *testing.T) {
reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand All @@ -432,7 +432,7 @@ func Test_searchIndexReconciler(t *testing.T) {
})

t.Run("update: must not call update API if indexes are equal", func(t *testing.T) {
reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -477,7 +477,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Status: status.AtlasDeploymentStatus{},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -538,7 +538,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Status: status.AtlasDeploymentStatus{},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -604,7 +604,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Status: status.AtlasDeploymentStatus{},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -661,7 +661,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Status: status.AtlasDeploymentStatus{},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down Expand Up @@ -732,7 +732,7 @@ func Test_searchIndexReconciler(t *testing.T) {
Status: status.AtlasDeploymentStatus{},
}

reconciler := &searchIndexReconciler{
reconciler := &searchIndexReconcileRequest{
ctx: &workflow.Context{
Log: zap.S(),
OrgID: "testOrgID",
Expand Down
20 changes: 10 additions & 10 deletions internal/controller/atlasdeployment/searchindexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func verifyAllIndexesNamesAreUnique(indexes []akov2.SearchIndex) bool {
return true
}

type searchIndexesReconciler struct {
type searchIndexesReconcileRequest struct {
ctx *workflow.Context
deployment *akov2.AtlasDeployment
k8sClient client.Client
Expand All @@ -56,18 +56,18 @@ func handleSearchIndexes(ctx *workflow.Context, k8sClient client.Client, searchS
ctx.Log.Debug("starting indexes processing")
defer ctx.Log.Debug("finished indexes processing")

reconciler := &searchIndexesReconciler{
reconciler := &searchIndexesReconcileRequest{
ctx: ctx,
k8sClient: k8sClient,
deployment: deployment,
projectID: projectID,
searchService: searchService,
}

return reconciler.Reconcile()
return reconciler.Handle()
}

func (sr *searchIndexesReconciler) Reconcile() workflow.Result {
func (sr *searchIndexesReconcileRequest) Handle() workflow.Result {
if !verifyAllIndexesNamesAreUnique(sr.deployment.Spec.DeploymentSpec.SearchIndexes) {
return sr.terminate(api.SearchIndexesNamesAreNotUnique, fmt.Errorf("every index 'Name' must be unique"))
}
Expand Down Expand Up @@ -108,14 +108,14 @@ func (sr *searchIndexesReconciler) Reconcile() workflow.Result {

results := make([]workflow.Result, 0, len(allIndexes))
for indexName, val := range allIndexes {
results = append(results, (&searchIndexReconciler{
results = append(results, (&searchIndexReconcileRequest{
ctx: sr.ctx,
deployment: sr.deployment,
k8sClient: sr.k8sClient,
projectID: sr.projectID,
indexName: indexName,
searchService: sr.searchService,
}).Reconcile(val.spec, val.previous))
}).Handle(val.spec, val.previous))
}

allDeleted := true
Expand All @@ -132,7 +132,7 @@ func (sr *searchIndexesReconciler) Reconcile() workflow.Result {
return sr.idle()
}

func (sr *searchIndexesReconciler) terminate(reason workflow.ConditionReason, err error) workflow.Result {
func (sr *searchIndexesReconcileRequest) terminate(reason workflow.ConditionReason, err error) workflow.Result {
sr.ctx.Log.Error(err)
var errMsg string
if err != nil {
Expand All @@ -143,18 +143,18 @@ func (sr *searchIndexesReconciler) terminate(reason workflow.ConditionReason, er
return result
}

func (sr *searchIndexesReconciler) progress() workflow.Result {
func (sr *searchIndexesReconcileRequest) progress() workflow.Result {
result := workflow.InProgress(api.SearchIndexesNotReady, "not all indexes are in READY state")
sr.ctx.SetConditionFromResult(status.SearchIndexStatusReady, result)
return result
}

func (sr *searchIndexesReconciler) empty() workflow.Result {
func (sr *searchIndexesReconcileRequest) empty() workflow.Result {
sr.ctx.UnsetCondition(api.SearchIndexesReadyType)
return workflow.OK()
}

func (sr *searchIndexesReconciler) idle() workflow.Result {
func (sr *searchIndexesReconcileRequest) idle() workflow.Result {
sr.ctx.SetConditionTrue(api.SearchIndexesReadyType)
sr.ctx.EnsureStatusOption(status.AtlasDeploymentRemoveStatusesWithEmptyIDs())
return workflow.OK()
Expand Down
Loading

0 comments on commit 15aa149

Please sign in to comment.