Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic objects to the topology #8

Merged
merged 12 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions controller/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ type RuntimeObject interface {
metav1.Object
}

type cacheMap map[schema.GroupKind]map[string]RuntimeObject
type Store map[schema.GroupKind]map[string]RuntimeObject

type cacheStore struct {
mu sync.RWMutex
store cacheMap
store Store
}

func newCacheStore() *cacheStore {
return &cacheStore{
store: make(cacheMap),
store: make(Store),
}
}

func (c *cacheStore) List() cacheMap {
func (c *cacheStore) List() Store {
c.mu.RLock()
defer c.mu.RUnlock()

cm := make(cacheMap, len(c.store))
cm := make(Store, len(c.store))
for gk, objs := range c.store {
if _, ok := cm[gk]; !ok {
cm[gk] = map[string]RuntimeObject{}
Expand Down
20 changes: 18 additions & 2 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import (
"k8s.io/client-go/tools/cache"
)

type RuntimeLinkFunc func(objs Store) machinery.LinkFunc

type ControllerOptions struct {
client *dynamic.DynamicClient
informers map[string]InformerBuilder
callback CallbackFunc
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []RuntimeLinkFunc
}

type ControllerOptionFunc func(*ControllerOptions)
Expand All @@ -42,7 +46,19 @@ func WithCallback(callback CallbackFunc) ControllerOptionFunc {

func WithPolicyKinds(policyKinds ...schema.GroupKind) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.policyKinds = policyKinds
o.policyKinds = append(o.policyKinds, policyKinds...)
}
}

func WithObjectKinds(objectKinds ...schema.GroupKind) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.objectKinds = append(o.objectKinds, objectKinds...)
}
}

func WithObjectLinks(objectLinks ...RuntimeLinkFunc) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.objectLinks = append(o.objectLinks, objectLinks...)
}
}

Expand All @@ -59,7 +75,7 @@ func NewController(f ...ControllerOptionFunc) *Controller {
controller := &Controller{
client: opts.client,
cache: newCacheStore(),
topology: NewGatewayAPITopology(opts.policyKinds...),
topology: NewGatewayAPITopology(opts.policyKinds, opts.objectKinds, opts.objectLinks),
informers: map[string]cache.SharedInformer{},
callback: opts.callback,
}
Expand Down
49 changes: 46 additions & 3 deletions controller/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,51 @@ func (t *EventType) String() string {
return [...]string{"create", "update", "delete"}[*t]
}

type InformerBuilderOptions struct {
LabelSelector string
FieldSelector string
}

type InformerBuilderOptionsFunc func(*InformerBuilderOptions)

func FilterResourcesByLabel(selector string) InformerBuilderOptionsFunc {
return func(o *InformerBuilderOptions) {
o.LabelSelector = selector
}
}

func FilterResourcesByField(selector string) InformerBuilderOptionsFunc {
return func(o *InformerBuilderOptions) {
o.FieldSelector = selector
}
}

type InformerBuilder func(controller *Controller) cache.SharedInformer

func For[T RuntimeObject](resource schema.GroupVersionResource, namespace string) InformerBuilder {
func For[T RuntimeObject](resource schema.GroupVersionResource, namespace string, options ...InformerBuilderOptionsFunc) InformerBuilder {
o := &InformerBuilderOptions{}
for _, f := range options {
f(o)
}
return func(controller *Controller) cache.SharedInformer {
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if o.LabelSelector != "" {
options.LabelSelector = o.LabelSelector
}
if o.FieldSelector != "" {
options.FieldSelector = o.FieldSelector
}
return controller.client.Resource(resource).Namespace(namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if o.LabelSelector != "" {
options.LabelSelector = o.LabelSelector
}
if o.FieldSelector != "" {
options.FieldSelector = o.FieldSelector
}
return controller.client.Resource(resource).Namespace(namespace).Watch(context.Background(), options)
},
},
Expand All @@ -56,12 +91,12 @@ func For[T RuntimeObject](resource schema.GroupVersionResource, namespace string
controller.delete(obj)
},
})
informer.SetTransform(restructure[T])
informer.SetTransform(Restructure[T])
return informer
}
}

func restructure[T any](obj any) (any, error) {
func Restructure[T any](obj any) (any, error) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("unexpected object type: %T", obj)
Expand All @@ -72,3 +107,11 @@ func restructure[T any](obj any) (any, error) {
}
return o, nil
}

func Destruct[T any](obj T) (*unstructured.Unstructured, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&obj)
if err != nil {
return nil, err
}
return &unstructured.Unstructured{Object: u}, nil
}
48 changes: 46 additions & 2 deletions controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ import (
"github.com/kuadrant/policy-machinery/machinery"
)

func NewGatewayAPITopology(policyKinds ...schema.GroupKind) *GatewayAPITopology {
func NewGatewayAPITopology(policyKinds, objectKinds []schema.GroupKind, objectLinks []RuntimeLinkFunc) *GatewayAPITopology {
return &GatewayAPITopology{
topology: machinery.NewTopology(),
policyKinds: policyKinds,
objectKinds: objectKinds,
objectLinks: objectLinks,
}
}

type GatewayAPITopology struct {
mu sync.RWMutex
topology *machinery.Topology
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []RuntimeLinkFunc
}

func (t *GatewayAPITopology) Refresh(objs cacheMap) {
func (t *GatewayAPITopology) Refresh(objs Store) {
t.mu.Lock()
defer t.mu.Unlock()

Expand Down Expand Up @@ -60,6 +64,10 @@ func (t *GatewayAPITopology) Refresh(objs cacheMap) {
return service, true
})

linkFuncs := lo.Map(t.objectLinks, func(linkFunc RuntimeLinkFunc, _ int) machinery.LinkFunc {
return linkFunc(objs)
})

opts := []machinery.GatewayAPITopologyOptionsFunc{
machinery.WithGatewayClasses(gatewayClasses...),
machinery.WithGateways(gateways...),
Expand All @@ -68,6 +76,7 @@ func (t *GatewayAPITopology) Refresh(objs cacheMap) {
machinery.ExpandGatewayListeners(),
machinery.ExpandHTTPRouteRules(),
machinery.ExpandServicePorts(),
machinery.WithGatewayAPITopologyLinks(linkFuncs...),
}

for i := range t.policyKinds {
Expand All @@ -80,6 +89,19 @@ func (t *GatewayAPITopology) Refresh(objs cacheMap) {
opts = append(opts, machinery.WithGatewayAPITopologyPolicies(policies...))
}

for i := range t.objectKinds {
objectKind := t.objectKinds[i]
objects := lo.FilterMap(lo.Values(objs[objectKind]), func(obj RuntimeObject, _ int) (machinery.Object, bool) {
object, ok := obj.(machinery.Object)
if ok {
return object, ok
}
return &Object{obj}, true
})

opts = append(opts, machinery.WithGatewayAPITopologyObjects(objects...))
}

t.topology = machinery.NewGatewayAPITopology(opts...)
}

Expand All @@ -92,3 +114,25 @@ func (t *GatewayAPITopology) Get() *machinery.Topology {
topology := *t.topology
return &topology
}

type Object struct {
RuntimeObject RuntimeObject
}

func (g *Object) GroupVersionKind() schema.GroupVersionKind {
return g.RuntimeObject.GetObjectKind().GroupVersionKind()
}

func (g *Object) SetGroupVersionKind(schema.GroupVersionKind) {}

func (g *Object) GetNamespace() string {
return g.RuntimeObject.GetNamespace()
}

func (g *Object) GetName() string {
return g.RuntimeObject.GetName()
}

func (g *Object) GetURL() string {
return machinery.UrlFromObject(g)
}
8 changes: 5 additions & 3 deletions examples/color_policy/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,21 @@ func TestKuadrantMergeBasedOnTopology(t *testing.T) {

machinery.SaveToOutputDir(t, topology.ToDot(), "../../tests/out", ".dot")

gateways := topology.Targetables(func(o machinery.Object) bool {
targetables := topology.Targetables()

gateways := targetables.Items(func(o machinery.Object) bool {
_, ok := o.(*machinery.Gateway)
return ok
})
httpRouteRules := topology.Targetables(func(o machinery.Object) bool {
httpRouteRules := targetables.Items(func(o machinery.Object) bool {
_, ok := o.(*machinery.HTTPRouteRule)
return ok
})

effectivePoliciesByPath := make(map[string]ColorPolicy)

for _, httpRouteRule := range httpRouteRules {
for _, path := range topology.Paths(gateways[0], httpRouteRule) {
for _, path := range targetables.Paths(gateways[0], httpRouteRule) {
// Gather all policies in the path sorted from the least specific (gateway) to the most specific (httprouterule)
// Since in this example there are no targetables with more than one policy attached to it, we can safely just
// flat the slices of policies; otherwise we would need to ensure that the policies at the same level are sorted
Expand Down
8 changes: 5 additions & 3 deletions examples/json_patch/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,21 @@ func TestJSONPatchMergeBasedOnTopology(t *testing.T) {

machinery.SaveToOutputDir(t, topology.ToDot(), "../../tests/out", ".dot")

gateways := topology.Targetables(func(o machinery.Object) bool {
targetables := topology.Targetables()

gateways := targetables.Items(func(o machinery.Object) bool {
_, ok := o.(*machinery.Gateway)
return ok
})
httpRouteRules := topology.Targetables(func(o machinery.Object) bool {
httpRouteRules := targetables.Items(func(o machinery.Object) bool {
_, ok := o.(*machinery.HTTPRouteRule)
return ok
})

effectivePoliciesByPath := make(map[string]ColorPolicy)

for _, httpRouteRule := range httpRouteRules {
for _, path := range topology.Paths(gateways[0], httpRouteRule) {
for _, path := range targetables.Paths(gateways[0], httpRouteRule) {
// Gather all policies in the path sorted from the least specific (gateway) to the most specific (httprouterule)
// Since in this example there are no targetables with more than one policy attached to it, we can safely just
// flat the slices of policies; otherwise we would need to ensure that the policies at the same level are sorted
Expand Down
40 changes: 40 additions & 0 deletions examples/kuadrant/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ $(CONTROLLER_GEN):
.PHONY: controller-gen
controller-gen: $(CONTROLLER_GEN) ## Download controller-gen locally if necessary.

HELM_VERSION = v3.15.0
HELM = $(PROJECT_PATH)/bin/helm
$(HELM):
@{ \
set -e ;\
mkdir -p $(dir $(HELM)) ;\
OS=$(shell go env GOOS) && ARCH=$(shell go env GOARCH) && \
curl -sL -o helm.tar.gz https://get.helm.sh/helm-$(HELM_VERSION)-$${OS}-$${ARCH}.tar.gz ;\
tar -zxvf helm.tar.gz ;\
mv $${OS}-$${ARCH}/helm $(HELM) ;\
chmod +x $(HELM) ;\
rm -rf $${OS}-$${ARCH} helm.tar.gz ;\
}

.PHONY: helm
helm: $(HELM) ## Download helm locally if necessary.

##@ Development

.PHONY: generate
Expand All @@ -66,4 +83,27 @@ install: manifests ## Install CRDs into a cluster.

.PHONY: run
run: generate ## Run the controller.
ifneq ($(PROVIDERS),)
go run *.go --gateway-providers $(PROVIDERS)
else
go run *.go
endif

##@ Testing

.PHONY: install-envoy-gateway
install-envoy-gateway: helm ## Install Envoy Gateway.
$(HELM) install eg oci://docker.io/envoyproxy/gateway-helm --version v1.0.2 -n envoy-gateway-system --create-namespace
kubectl wait --timeout=5m -n envoy-gateway-system deployment/envoy-gateway --for=condition=Available

.PHONY: install-istio
install-istio: helm ## Install Istio.
$(HELM) repo add istio https://istio-release.storage.googleapis.com/charts
$(HELM) repo update
kubectl create namespace istio-system
$(HELM) install istio-base istio/base -n istio-system --set defaultRevision=default
$(HELM) install istiod istio/istiod -n istio-system --wait

.PHONY: install-kuadrant
install-kuadrant: ## Install Kuadrant CRDs.
kubectl apply -f config/crds
8 changes: 4 additions & 4 deletions examples/kuadrant/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Kuadrant Controller

Practical example of using the [Policy Machinery](https://github.com/kuadrant/policy-machinery) to implment a custom controller.
Practical example of using the [Policy Machinery](https://github.com/kuadrant/policy-machinery) to implement a custom controller.

<br/>

The examples defines 4 kinds of policies:
The example defines 4 kinds of policies:
- **DNSPolicy:** can target Gateways and Listeners
- **TLSPolicy:** can target Gateways and Listeners
- **AuthPolicy:** can target Gateways, Listeners, HTTPRoutes, and HTTPRouteRules; support for Defaults & Overrides and 2 merge strategies (`atomic` or `merge`)
Expand Down Expand Up @@ -35,15 +35,15 @@ Install the CRDs:
make install
```

Run the operator (holds the shell):
Run the controller (holds the shell):

```sh
make run
```

### Create the resources

> **Note:** After each step below, check out the state of the topology (`topology.dot`) and the operator logs for the new effective policies in place.
> **Note:** After each step below, check out the state of the topology (`topology.dot`) and the controller logs for the new effective policies in place.

1. Create a Gateway:

Expand Down
Loading