diff --git a/k8s/api/tackle/v1alpha1/taskprofile.go b/k8s/api/tackle/v1alpha1/taskprofile.go new file mode 100644 index 000000000..1df4498fc --- /dev/null +++ b/k8s/api/tackle/v1alpha1/taskprofile.go @@ -0,0 +1,69 @@ +/* +Copyright 2019 Red Hat Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ProfileSelector +// tag:category=tag +// platform:target=kind +type ProfileSelector struct { + Match string `json:"match,omitempty"` + Name string `json:"name,omitempty"` + Capability string `json:"capability,omitempty"` +} + +// TaskProfileSpec defines the desired state of TaskProfile +type TaskProfileSpec struct { + // Addon selector. + Addon []ProfileSelector `json:"addon,omitempty"` + // Component selector. + Component []ProfileSelector `json:"component,omitempty"` +} + +// TaskProfileStatus defines the observed state of TaskProfile +type TaskProfileStatus struct { + // The most recent generation observed by the controller. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` +} + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:openapi-gen=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="READY",type=string,JSONPath=".status.conditions[?(@.type=='Ready')].status" +// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp" +type TaskProfile struct { + meta.TypeMeta `json:",inline"` + meta.ObjectMeta `json:"metadata,omitempty"` + Spec TaskProfileSpec `json:"spec,omitempty"` + Status TaskProfileStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type TaskProfileList struct { + meta.TypeMeta `json:",inline"` + meta.ListMeta `json:"metadata,omitempty"` + Items []TaskProfile `json:"items"` +} + +func init() { + SchemeBuilder.Register(&TaskProfile{}, &TaskProfileList{}) +} diff --git a/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go b/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go index c90710941..0d2ad3abd 100644 --- a/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go +++ b/k8s/api/tackle/v1alpha1/zz_generated.deepcopy.go @@ -224,6 +224,21 @@ func (in *ComponentStatus) DeepCopy() *ComponentStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProfileSelector) DeepCopyInto(out *ProfileSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProfileSelector. +func (in *ProfileSelector) DeepCopy() *ProfileSelector { + if in == nil { + return nil + } + out := new(ProfileSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Tackle) DeepCopyInto(out *Tackle) { *out = *in @@ -280,3 +295,102 @@ func (in *TackleList) DeepCopyObject() runtime.Object { } return nil } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskProfile) DeepCopyInto(out *TaskProfile) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskProfile. +func (in *TaskProfile) DeepCopy() *TaskProfile { + if in == nil { + return nil + } + out := new(TaskProfile) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TaskProfile) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskProfileList) DeepCopyInto(out *TaskProfileList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TaskProfile, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskProfileList. +func (in *TaskProfileList) DeepCopy() *TaskProfileList { + if in == nil { + return nil + } + out := new(TaskProfileList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TaskProfileList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskProfileSpec) DeepCopyInto(out *TaskProfileSpec) { + *out = *in + if in.Addon != nil { + in, out := &in.Addon, &out.Addon + *out = make([]ProfileSelector, len(*in)) + copy(*out, *in) + } + if in.Component != nil { + in, out := &in.Component, &out.Component + *out = make([]ProfileSelector, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskProfileSpec. +func (in *TaskProfileSpec) DeepCopy() *TaskProfileSpec { + if in == nil { + return nil + } + out := new(TaskProfileSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskProfileStatus) DeepCopyInto(out *TaskProfileStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskProfileStatus. +func (in *TaskProfileStatus) DeepCopy() *TaskProfileStatus { + if in == nil { + return nil + } + out := new(TaskProfileStatus) + in.DeepCopyInto(out) + return out +} diff --git a/task/manager.go b/task/manager.go index 0210c8aa4..9b95df466 100644 --- a/task/manager.go +++ b/task/manager.go @@ -19,6 +19,7 @@ import ( "github.com/konveyor/tackle2-hub/metrics" "github.com/konveyor/tackle2-hub/model" "github.com/konveyor/tackle2-hub/settings" + "github.com/konveyor/tackle2-hub/task/profile" "gopkg.in/yaml.v2" "gorm.io/gorm" core "k8s.io/api/core/v1" @@ -58,6 +59,21 @@ var ( Log = logr.WithName("task-scheduler") ) +// ProfileNotFound used to report profile referenced +// by a task but cannot be found. +type ProfileNotFound struct { + Name string +} + +func (e *ProfileNotFound) Error() (s string) { + return fmt.Sprintf("Profile: '%s' not-found.", e.Name) +} + +func (e *ProfileNotFound) Is(err error) (matched bool) { + _, matched = err.(*ProfileNotFound) + return +} + // AddonNotFound used to report addon referenced // by a task but cannot be found. type AddonNotFound struct { @@ -121,11 +137,6 @@ func (m *Manager) Run(ctx context.Context) { }() } -func (m *Manager) db() (db *gorm.DB) { - db = m.DB.Session(&gorm.Session{}) - return -} - // Pause. func (m *Manager) pause() { d := Unit * time.Duration(Settings.Frequency.Task) @@ -135,7 +146,8 @@ func (m *Manager) pause() { // startReady starts pending tasks. func (m *Manager) startReady() { list := []model.Task{} - db := m.db().Order("priority DESC, id") + db := m.DB.Session(&gorm.Session{}) + db = db.Order("priority DESC, id") result := db.Find( &list, "state IN ?", @@ -156,7 +168,8 @@ func (m *Manager) startReady() { task.State = Failed task.Terminated = &mark task.Error("Error", "Hub is disconnected.") - sErr := m.db().Save(task).Error + db := m.DB.Session(&gorm.Session{}) + sErr := db.Save(task).Error Log.Error(sErr, "") continue } @@ -171,7 +184,8 @@ func (m *Manager) startReady() { if m.postpone(ready, list) { ready.State = Postponed Log.Info("Task postponed.", "id", ready.ID) - sErr := m.db().Save(ready).Error + db := m.DB.Session(&gorm.Session{}) + sErr := db.Save(ready).Error Log.Error(sErr, "") continue } @@ -179,14 +193,16 @@ func (m *Manager) startReady() { metrics.TasksInitiated.Inc() } rt := Task{ready} - err := rt.Run(m.Client) + db := m.DB.Session(&gorm.Session{}) + err := rt.Run(db, m.Client) if err != nil { ready.State = Failed Log.Error(err, "") } else { Log.Info("Task started.", "id", ready.ID) } - err = m.db().Save(ready).Error + db = m.DB.Session(&gorm.Session{}) + err = db.Save(ready).Error Log.Error(err, "") default: // Ignored. @@ -199,7 +215,8 @@ func (m *Manager) startReady() { // updateRunning tasks to reflect pod state. func (m *Manager) updateRunning() { list := []model.Task{} - db := m.db().Order("priority DESC, id") + db := m.DB.Session(&gorm.Session{}) + db = db.Order("priority DESC, id") result := db.Find( &list, "state IN ?", @@ -217,7 +234,8 @@ func (m *Manager) updateRunning() { continue } rt := Task{&running} - pod, err := rt.Reflect(m.Client) + db := m.DB.Session(&gorm.Session{}) + pod, err := rt.Reflect(db, m.Client) if err != nil { Log.Error(err, "") continue @@ -234,7 +252,8 @@ func (m *Manager) updateRunning() { continue } } - err = m.db().Save(&running).Error + db = m.DB.Session(&gorm.Session{}) + err = db.Save(&running).Error if err != nil { Log.Error(result.Error, "") continue @@ -277,9 +296,11 @@ func (m *Manager) canceled(task *model.Task) { if err != nil { return } - err = m.db().Save(task).Error + db := m.DB.Session(&gorm.Session{}) + err = db.Save(task).Error Log.Error(err, "") - db := m.db().Model(&model.TaskReport{}) + db = m.DB.Session(&gorm.Session{}) + db = db.Model(&model.TaskReport{}) err = db.Delete("taskid", task.ID).Error Log.Error(err, "") return @@ -308,7 +329,8 @@ func (m *Manager) snapshotPod(task *Task, pod *core.Pod) (err error) { // podDescription builds pod resource description. func (m *Manager) podDescription(pod *core.Pod) (file *model.File, err error) { file = &model.File{Name: "pod.yaml"} - err = m.db().Create(file).Error + db := m.DB.Session(&gorm.Session{}) + err = db.Create(file).Error if err != nil { err = liberr.Wrap(err) return @@ -360,7 +382,8 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil _ = reader.Close() }() file = &model.File{Name: container + ".log"} - err = m.db().Create(file).Error + db := m.DB.Session(&gorm.Session{}) + err = db.Create(file).Error if err != nil { err = liberr.Wrap(err) return @@ -388,7 +411,7 @@ type Task struct { } // Run the specified task. -func (r *Task) Run(client k8s.Client) (err error) { +func (r *Task) Run(db *gorm.DB, client k8s.Client) (err error) { mark := time.Now() defer func() { if err != nil { @@ -401,6 +424,17 @@ func (r *Task) Run(client k8s.Client) (err error) { if err != nil { return } + tp, err := r.findProfile(client) + if err != nil { + return + } + if tp != nil { + p := profile.New(tp) + err = p.Apply(db, client, r.Task) + if err != nil { + return + } + } addon, err := r.findAddon(client) if err != nil { return @@ -453,7 +487,7 @@ func (r *Task) Run(client k8s.Client) (err error) { } // Reflect finds the associated pod and updates the task state. -func (r *Task) Reflect(client k8s.Client) (pod *core.Pod, err error) { +func (r *Task) Reflect(db *gorm.DB, client k8s.Client) (pod *core.Pod, err error) { pod = &core.Pod{} err = client.Get( context.TODO(), @@ -464,7 +498,7 @@ func (r *Task) Reflect(client k8s.Client) (pod *core.Pod, err error) { pod) if err != nil { if k8serr.IsNotFound(err) { - err = r.Run(client) + err = r.Run(db, client) } else { err = liberr.Wrap(err) } @@ -609,6 +643,32 @@ func (r *Task) podFailed(pod *core.Pod, client k8s.Client) { } } +// findProfile by name. +func (r *Task) findProfile(client k8s.Client) (profile *crd.TaskProfile, err error) { + if r.Profile == "" { + return + } + profile = &crd.TaskProfile{} + err = client.Get( + context.TODO(), + k8s.ObjectKey{ + Namespace: Settings.Hub.Namespace, + Name: r.Addon, + }, + profile) + if err != nil { + profile = nil + if k8serr.IsNotFound(err) { + err = &ProfileNotFound{r.Addon} + } else { + err = liberr.Wrap(err) + } + return + } + + return +} + // findAddon by name. func (r *Task) findAddon(client k8s.Client) (addon *crd.Addon, err error) { addon = &crd.Addon{} diff --git a/task/profile/profile.go b/task/profile/profile.go new file mode 100644 index 000000000..ec5ca439e --- /dev/null +++ b/task/profile/profile.go @@ -0,0 +1,89 @@ +package profile + +import ( + "encoding/json" + "fmt" + + crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1" + "github.com/konveyor/tackle2-hub/model" + "gorm.io/gorm" + k8s "sigs.k8s.io/controller-runtime/pkg/client" +) + +type NotResolved struct { + Kind string + Name string +} + +func (e *NotResolved) Error() (s string) { + return fmt.Sprintf("%s: '%s' not-resolved.", e.Kind, e.Name) +} + +func (e *NotResolved) Is(err error) (matched bool) { + _, matched = err.(*NotResolved) + return +} + +func New(tp *crd.TaskProfile) (p *Profile) { + p = &Profile{} + p.TaskProfileSpec = tp.Spec + return +} + +type Profile struct { + crd.TaskProfileSpec +} + +func (p *Profile) Apply(db *gorm.DB, client k8s.Client, task *model.Task) (err error) { + err = p.setAddon(db, client, task) + if err != nil { + return + } + err = p.setComponent(db, client, task) + if err != nil { + return + } + return +} + +func (p *Profile) setAddon(db *gorm.DB, client k8s.Client, task *model.Task) (err error) { + for i := range p.Addon { + var selector Selector + var matched []string + resolver := &AddonResolver{} + resolver.client = client + selector, err = NewSelector(p.Addon[i], resolver) + if err != nil { + return + } + matched, err = selector.Match(db, task) + if err != nil { + return + } + if len(matched) == 0 { + err = &NotResolved{Kind: "addon"} + return + } + task.Addon = matched[0] + } + return +} + +func (p *Profile) setComponent(db *gorm.DB, client k8s.Client, task *model.Task) (err error) { + for i := range p.Component { + var selector Selector + var matched []string + resolver := &AddonResolver{} + resolver.client = client + selector, err = NewSelector(p.Component[i], resolver) + if err != nil { + return + } + matched, err = selector.Match(db, task) + if err != nil { + return + } + task.Components, _ = json.Marshal(matched) + } + return +} diff --git a/task/profile/resolver.go b/task/profile/resolver.go new file mode 100644 index 000000000..4d0e995da --- /dev/null +++ b/task/profile/resolver.go @@ -0,0 +1,27 @@ +package profile + +import k8s "sigs.k8s.io/controller-runtime/pkg/client" + +type Resolver interface { + Match(capability string) (names []string, err error) +} + +type BaseResolver struct { + client k8s.Client +} + +type AddonResolver struct { + BaseResolver +} + +func (r *AddonResolver) Match(capability string) (names []string, err error) { + return +} + +type ComponentResolver struct { + BaseResolver +} + +func (r *ComponentResolver) Match(capability string) (names []string, err error) { + return +} diff --git a/task/profile/selector.go b/task/profile/selector.go new file mode 100644 index 000000000..f07888c87 --- /dev/null +++ b/task/profile/selector.go @@ -0,0 +1,100 @@ +package profile + +import ( + "strings" + + crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha1" + "github.com/konveyor/tackle2-hub/model" + "gorm.io/gorm" +) + +func NewSelector(p crd.ProfileSelector, r Resolver) (selector Selector, err error) { + match := p.Match + parsed := ParsedSelector{} + part := strings.SplitN(match, "/", 1) + if len(part) > 1 { + parsed.ns = part[0] + match = part[1] + } + part = strings.SplitN(match, ":", 1) + if len(part) > 1 { + parsed.kind = part[0] + match = part[1] + } + part = strings.SplitN(match, "=", 1) + parsed.name = part[0] + if len(part) > 1 { + parsed.value = part[1] + } + switch parsed.kind { + case "tag": + selector = &TagSelector{ + BaseSelector: BaseSelector{ + ProfileSelector: p, + resolver: r, + parsed: parsed, + }} + default: + } + return +} + +type Selector interface { + Match(db *gorm.DB, task *model.Task) (matched []string, err error) +} + +type BaseSelector struct { + crd.ProfileSelector + resolver Resolver + parsed ParsedSelector +} + +type ParsedSelector struct { + ns string + kind string + name string + value string +} + +type TagSelector struct { + BaseSelector +} + +func (r *TagSelector) Match(db *gorm.DB, task *model.Task) (matched []string, err error) { + parsed := r.parsed + db = db.Session(&gorm.Session{}) + application := &model.Application{} + err = db.First(application, task.Application.ID).Error + if err != nil { + return + } + db = db.Session(&gorm.Session{}) + category := &model.TagCategory{} + err = db.First(category, "name=?", parsed.name).Error + if err != nil { + return + } + for _, ref := range application.Tags { + tag := &model.Tag{} + err = db.First(tag, ref.ID).Error + if err != nil { + return + } + if parsed.name == "" || tag.Name == parsed.name { + if r.Name != "" { + matched = append(matched, r.Name) + } + if r.Capability != "" { + var names []string + names, err = r.resolver.Match(r.Capability) + if err == nil { + matched = append(matched, names...) + } else { + return + } + } + + } + } + return +}