forked from raystack/entropy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdriver_sync.go
88 lines (72 loc) · 2.39 KB
/
driver_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package dagger
import (
"context"
"encoding/json"
"github.com/goto/entropy/core/module"
"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/modules"
"github.com/goto/entropy/modules/flink"
"github.com/goto/entropy/modules/kubernetes"
"github.com/goto/entropy/pkg/errors"
)
func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (*resource.State, error) {
modData, err := readTransientData(exr)
if err != nil {
return nil, err
}
out, err := readOutputData(exr)
if err != nil {
return nil, errors.ErrInternal.WithCausef(err.Error())
}
conf, err := readConfig(exr, exr.Spec.Configs, dd.conf)
if err != nil {
return nil, errors.ErrInternal.WithCausef(err.Error())
}
var flinkOut flink.Output
if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error())
}
finalState := resource.State{
Status: resource.StatusPending,
Output: exr.Resource.State.Output,
}
if len(modData.PendingSteps) > 0 {
pendingStep := modData.PendingSteps[0]
modData.PendingSteps = modData.PendingSteps[1:]
switch pendingStep {
case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset, stepSavepointCreate:
isCreate := pendingStep == stepReleaseCreate
if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil {
return nil, err
}
default:
return nil, errors.ErrInternal.WithMsgf("unknown step: '%s'", pendingStep)
}
// we have more pending states, so enqueue resource for another sync
// as soon as possible.
immediately := dd.timeNow()
finalState.NextSyncAt = &immediately
finalState.ModuleData = modules.MustJSON(modData)
return &finalState, nil
}
finalOut, err := dd.refreshOutput(ctx, exr.Resource, *conf, *out, flinkOut.KubeCluster)
if err != nil {
return nil, err
}
finalState.Output = finalOut
finalState.Status = resource.StatusCompleted
finalState.ModuleData = nil
return &finalState, nil
}
func (dd *daggerDriver) releaseSync(ctx context.Context, r resource.Resource,
isCreate bool, conf Config, kubeOut kubernetes.Output,
) error {
rc, err := dd.getHelmRelease(r, conf, kubeOut)
if err != nil {
return err
}
if err := dd.kubeDeploy(ctx, isCreate, kubeOut.Configs, *rc); err != nil {
return errors.ErrInternal.WithCausef(err.Error())
}
return nil
}