Skip to content

Commit

Permalink
feat: handle resources reconcilation
Browse files Browse the repository at this point in the history
Signed-off-by: minhthong582000 <[email protected]>
  • Loading branch information
minhthong582000 committed Nov 3, 2024
1 parent 028a2e6 commit 8791a16
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 10 deletions.
2 changes: 1 addition & 1 deletion gitops/example/nginx/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:
resources:
requests:
cpu: 100m
memory: 128Mi
memory: 256Mi
limits:
cpu: 100m
memory: 256Mi
92 changes: 83 additions & 9 deletions gitops/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Controller struct {
// Every time a new event detected by informer, it will be added to the queue
queue workqueue.RateLimitingInterface

// appRefreshQueue is used to reconcile the application periodically after created
appRefreshQueue workqueue.RateLimitingInterface

gitUtil git.GitClient

k8sUtil k8sutil.K8s
Expand Down Expand Up @@ -73,6 +76,10 @@ func NewController(
workqueue.DefaultControllerRateLimiter(),
"application",
),
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
"application-reconcile",
),
gitUtil: gitUtil,
k8sUtil: k8sUtil,
eventRecorder: recorder,
Expand Down Expand Up @@ -107,6 +114,11 @@ func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}) error {
go wait.Until(c.worker, 1*time.Second, stopCh)
}

for i := 0; i < numWorkers; i++ {
// Wait every 1 second to process the next item in the queue
go wait.Until(c.applicationRefreshWorker, 1*time.Second, stopCh)
}

<-stopCh

return nil
Expand All @@ -117,6 +129,11 @@ func (c *Controller) worker() {
}
}

func (c *Controller) applicationRefreshWorker() {
for c.processNextAppRefreshItem() {
}
}

func (c *Controller) processNextItem() bool {
ctx := context.Background()

Expand Down Expand Up @@ -167,12 +184,9 @@ func (c *Controller) processNextItem() bool {
return fmt.Errorf("error getting deployment info: %s", err)
}

err = c.createResources(ctx, app)
if err != nil {
c.queue.AddRateLimited(obj)
return fmt.Errorf("error creating resources: %s", err)
}

// Hand it over to the refresh queue on creation
// TODO: use a cache instead of depending on Informer
c.requestAppRefresh(app.GetName(), app.GetNamespace())
c.queue.Forget(obj)

return nil
Expand All @@ -188,6 +202,61 @@ func (c *Controller) processNextItem() bool {
return true
}

func (c *Controller) processNextAppRefreshItem() bool {
ctx := context.Background()

appKey, shutdown := c.appRefreshQueue.Get()
if shutdown {
return false
}

log.Info("Processing application refresh " + appKey.(string))

// We wrap this block in a func so we can defer c.workqueue.Done.
app, err := func(appKey string) (*v1alpha1.Application, error) {
defer c.appRefreshQueue.Done(appKey)

// Split the key into namespace and name
ns, name, err := cache.SplitMetaNamespaceKey(appKey)
if err != nil {
// Since we can't process the item, we stop processing it
c.appRefreshQueue.Forget(appKey)
return nil, fmt.Errorf("error splitting key: %s", err)
}

app, err := c.appClientSet.ThongdepzaiV1alpha1().Applications(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
// This means the application is deleted while processing
if apierrors.IsNotFound(err) {
c.appRefreshQueue.Forget(appKey)
return app, nil
}

// If there is another type of error, requeue the item
c.appRefreshQueue.AddRateLimited(appKey)
return nil, fmt.Errorf("error getting deployment info: %s", err)
}

err = c.createResources(ctx, app)
if err != nil {
c.appRefreshQueue.AddRateLimited(appKey)
return nil, fmt.Errorf("error creating resources: %s", err)
}
c.queue.Forget(appKey)

return app, nil
}(appKey.(string))

if err != nil {
utilruntime.HandleError(err)
c.updateAppStatus(ctx, app, &v1alpha1.ApplicationStatus{
HealthStatus: v1alpha1.HealthStatusDegraded,
})
}

return true
}

func (c *Controller) createResources(ctx context.Context, app *v1alpha1.Application) error {
repoPath := path.Join(os.TempDir(), app.Name, strings.Replace(app.Spec.Repository, "/", "_", -1))
log.Println(repoPath)
Expand Down Expand Up @@ -339,11 +408,11 @@ func (c *Controller) handleUdate(old, new interface{}) {

// Compare old and new spec
if equality.Semantic.DeepEqual(oldApp.Spec, newApp.Spec) {
log.Debugf("No changes in spec, skipping")
return
log.Debugf("No changes in application spec: %s", newApp.Name)
}

c.queue.AddRateLimited(new)
// TODO: use a cache instead of depending on Informer
c.requestAppRefresh(newApp.GetName(), newApp.GetNamespace())
}

// updateAppStatus to safely update the status of an application.
Expand All @@ -370,3 +439,8 @@ func (c *Controller) updateAppStatus(ctx context.Context, app *v1alpha1.Applicat

return nil
}

func (c *Controller) requestAppRefresh(appName string, namespace string) {
key := namespace + "/" + appName
c.appRefreshQueue.AddRateLimited(key)
}

0 comments on commit 8791a16

Please sign in to comment.