@@ -19,18 +19,26 @@ package pod
19
19
import (
20
20
"context"
21
21
"encoding/json"
22
+ "fmt"
22
23
"net/http"
23
24
24
25
admissionv1 "k8s.io/api/admission/v1"
26
+ appsv1 "k8s.io/api/apps/v1"
25
27
v1 "k8s.io/api/core/v1"
28
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
+ "k8s.io/apimachinery/pkg/runtime"
30
+ "k8s.io/apimachinery/pkg/runtime/schema"
31
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
26
32
kubeclientset "k8s.io/client-go/kubernetes"
33
+ "k8s.io/client-go/kubernetes/scheme"
27
34
"k8s.io/klog/v2"
28
35
ctrl "sigs.k8s.io/controller-runtime"
29
36
"sigs.k8s.io/controller-runtime/pkg/client"
30
37
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
31
38
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
32
39
33
40
"github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh"
41
+ "github.com/KusionStack/controller-mesh/pkg/manager/controllers/rollout"
34
42
)
35
43
36
44
type MutatingHandler struct {
@@ -90,3 +98,87 @@ func (h *MutatingHandler) InjectDecoder(d *admission.Decoder) error {
90
98
h .Decoder = d
91
99
return nil
92
100
}
101
+
102
+ func (h * MutatingHandler ) revisionRollOut (ctx context.Context , pod * v1.Pod ) (err error ) {
103
+ podRevision := pod .Labels [appsv1 .ControllerRevisionHashLabelKey ]
104
+ sts := & appsv1.StatefulSet {}
105
+ if pod .OwnerReferences == nil || len (pod .OwnerReferences ) == 0 {
106
+ return fmt .Errorf ("illegal owner reference" )
107
+ }
108
+ if pod .OwnerReferences [0 ].Kind != "StatefulSet" {
109
+ return fmt .Errorf ("illegal owner reference kind %s" , pod .OwnerReferences [0 ].Kind )
110
+ }
111
+
112
+ sts , err = h .directKubeClient .AppsV1 ().StatefulSets (pod .Namespace ).Get (ctx , pod .OwnerReferences [0 ].Name , metav1.GetOptions {})
113
+ if err != nil {
114
+ klog .Error (err )
115
+ return err
116
+ }
117
+ if sts .Spec .UpdateStrategy .Type != appsv1 .OnDeleteStatefulSetStrategyType {
118
+ return nil
119
+ }
120
+ expectState := rollout .GetExpectedRevision (sts )
121
+ if expectState .UpdateRevision == "" || expectState .PodRevision == nil || expectState .PodRevision [pod .Name ] == "" {
122
+ return
123
+ }
124
+ expectedRevision := expectState .PodRevision [pod .Name ]
125
+ if expectedRevision == podRevision {
126
+ return
127
+ }
128
+ // Do not use manager client get ControllerRevision. (To avoid Informer cache)
129
+ expectRevision , err := h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , expectedRevision , metav1.GetOptions {})
130
+ if err != nil {
131
+ return fmt .Errorf ("cannot find old ControllerRevision %s" , expectedRevision )
132
+ }
133
+
134
+ createRevision , err := h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , podRevision , metav1.GetOptions {})
135
+ if err != nil {
136
+ return fmt .Errorf ("cannot find ControllerRevision %s by pod %s/%s" , podRevision , pod .Namespace , pod .Name )
137
+ }
138
+
139
+ expectedSts := & appsv1.StatefulSet {}
140
+ createdSts := & appsv1.StatefulSet {}
141
+
142
+ applyPatch (expectedSts , & expectRevision .Data .Raw )
143
+ applyPatch (createdSts , & createRevision .Data .Raw )
144
+
145
+ expectedPo := & v1.Pod {
146
+ Spec : expectedSts .Spec .Template .Spec ,
147
+ }
148
+ createdPo := & v1.Pod {
149
+ Spec : createdSts .Spec .Template .Spec ,
150
+ }
151
+
152
+ expectedBt , _ := runtime .Encode (patchCodec , expectedPo )
153
+ createdBt , _ := runtime .Encode (patchCodec , createdPo )
154
+ currentBt , _ := runtime .Encode (patchCodec , pod )
155
+
156
+ patch , err := strategicpatch .CreateTwoWayMergePatch (createdBt , expectedBt , expectedPo )
157
+ if err != nil {
158
+ return err
159
+ }
160
+ originBt , err := strategicpatch .StrategicMergePatch (currentBt , patch , pod )
161
+ if err != nil {
162
+ return err
163
+ }
164
+ newPod := & v1.Pod {}
165
+ if err = json .Unmarshal (originBt , newPod ); err != nil {
166
+ return err
167
+ }
168
+ pod .Spec = newPod .Spec
169
+ pod .Labels [appsv1 .ControllerRevisionHashLabelKey ] = expectedRevision
170
+ return
171
+ }
172
+
173
+ var patchCodec = scheme .Codecs .LegacyCodec (schema.GroupVersion {Group : "apps" , Version : "v1" }, schema.GroupVersion {Version : "v1" })
174
+
175
+ func applyPatch (target runtime.Object , podPatch * []byte ) error {
176
+ patched , err := strategicpatch .StrategicMergePatch ([]byte (runtime .EncodeOrDie (patchCodec , target )), * podPatch , target )
177
+ if err != nil {
178
+ return err
179
+ }
180
+ if err = json .Unmarshal (patched , target ); err != nil {
181
+ return err
182
+ }
183
+ return nil
184
+ }
0 commit comments