Skip to content

Commit

Permalink
Bug fixes: 1. NPE when model is not found 2. Port is considered 0 whe…
Browse files Browse the repository at this point in the history
…n LLMServerPool is not initialized (#79)

* Fix null pointer error when model is not found

* Fix a race condition where LLMServerPool has not been initialized yet
  • Loading branch information
liu-cong authored Dec 10, 2024
1 parent 5372efb commit bce8d6f
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 250 deletions.
52 changes: 47 additions & 5 deletions pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backend

import (
"fmt"
"math/rand"
"sync"

Expand All @@ -9,24 +10,65 @@ import (
"k8s.io/klog/v2"
)

func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
store := &K8sDatastore{
poolMu: sync.RWMutex{},
llmServices: &sync.Map{},
pods: &sync.Map{},
}
for _, opt := range options {
opt(store)
}
return store
}

// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
type K8sDatastore struct {
LLMServerPool *v1alpha1.LLMServerPool
LLMServices *sync.Map
Pods *sync.Map
// poolMu is used to synchronize access to the llmServerPool.
poolMu sync.RWMutex
llmServerPool *v1alpha1.LLMServerPool
llmServices *sync.Map
pods *sync.Map
}

type K8sDatastoreOption func(*K8sDatastore)

// WithPods can be used in tests to override the pods.
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
return func(store *K8sDatastore) {
store.pods = &sync.Map{}
for _, pod := range pods {
store.pods.Store(pod.Pod, true)
}
}
}

func (ds *K8sDatastore) setLLMServerPool(pool *v1alpha1.LLMServerPool) {
ds.poolMu.Lock()
defer ds.poolMu.Unlock()
ds.llmServerPool = pool
}

func (ds *K8sDatastore) getLLMServerPool() (*v1alpha1.LLMServerPool, error) {
ds.poolMu.RLock()
defer ds.poolMu.RUnlock()
if ds.llmServerPool == nil {
return nil, fmt.Errorf("LLMServerPool hasn't been initialized yet")
}
return ds.llmServerPool, nil
}

func (ds *K8sDatastore) GetPodIPs() []string {
var ips []string
ds.Pods.Range(func(name, pod any) bool {
ds.pods.Range(func(name, pod any) bool {
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
return true
})
return ips
}

func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.Model) {
s.LLMServices.Range(func(k, v any) bool {
s.llmServices.Range(func(k, v any) bool {
service := v.(*v1alpha1.LLMService)
klog.V(3).Infof("Service name: %v", service.Name)
for _, model := range service.Spec.Models {
Expand Down
7 changes: 3 additions & 4 deletions pkg/ext-proc/backend/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ var ()

func TestRandomWeightedDraw(t *testing.T) {
tests := []struct {
name string
datastore K8sDatastore
model *v1alpha1.Model
want string
name string
model *v1alpha1.Model
want string
}{
{
name: "'random' distribution",
Expand Down
42 changes: 27 additions & 15 deletions pkg/ext-proc/backend/endpointslice_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var (
Expand All @@ -28,18 +30,14 @@ type EndpointSliceReconciler struct {
}

func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName)
klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName)

endpointSlice := &discoveryv1.EndpointSlice{}
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
klog.Error(err, "unable to get LLMServerPool")
klog.Errorf("Unable to get EndpointSlice: %v", err)
return ctrl.Result{}, err
}

if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) {
return ctrl.Result{}, nil
}

c.updateDatastore(endpointSlice)

return ctrl.Result{}, nil
Expand All @@ -50,9 +48,9 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli
for _, endpoint := range slice.Endpoints {
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
if c.validPod(endpoint) {
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.LLMServerPool.Spec.TargetPort)}
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.llmServerPool.Spec.TargetPort)}
podMap[pod] = true
c.Datastore.Pods.Store(pod, true)
c.Datastore.pods.Store(pod, true)
}
}

Expand All @@ -63,23 +61,37 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli
return false
}
if _, ok := podMap[pod]; !ok {
c.Datastore.Pods.Delete(pod)
c.Datastore.pods.Delete(pod)
}
return true
}
c.Datastore.Pods.Range(removeOldPods)
c.Datastore.pods.Range(removeOldPods)
}

func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
llmServerPoolAvailable := func(object client.Object) bool {
_, err := c.Datastore.getLLMServerPool()
if err != nil {
klog.Warningf("Skipping reconciling EndpointSlice because LLMServerPool is not available yet: %v", err)
}
return err == nil
}

ownsEndPointSlice := func(object client.Object) bool {
// Check if the object is an EndpointSlice
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
if !ok {
return false
}

return endpointSlice.ObjectMeta.Labels[serviceOwnerLabel] == c.ServiceName
}

return ctrl.NewControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{}).
For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.NewPredicateFuncs(llmServerPoolAvailable), predicate.NewPredicateFuncs(ownsEndPointSlice))).
Complete(c)
}

func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool {
return labels[serviceOwnerLabel] == c.ServiceName
}

func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
return validZone && *endpoint.Conditions.Ready == true
Expand Down
40 changes: 17 additions & 23 deletions pkg/ext-proc/backend/endpointslice_reconcilier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ var (
func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
tests := []struct {
name string
datastore K8sDatastore
datastore *K8sDatastore
incomingSlice *discoveryv1.EndpointSlice
want K8sDatastore
wantPods *sync.Map
}{
{
name: "Add new pod",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
LLMServerPool: &v1alpha1.LLMServerPool{
datastore: &K8sDatastore{
pods: populateMap(basePod1, basePod2),
llmServerPool: &v1alpha1.LLMServerPool{
Spec: v1alpha1.LLMServerPoolSpec{
TargetPort: int32(8000),
},
Expand Down Expand Up @@ -66,15 +66,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod1, basePod2, basePod3),
},
wantPods: populateMap(basePod1, basePod2, basePod3),
},
{
name: "New pod, but its not ready yet. Do not add.",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
LLMServerPool: &v1alpha1.LLMServerPool{
datastore: &K8sDatastore{
pods: populateMap(basePod1, basePod2),
llmServerPool: &v1alpha1.LLMServerPool{
Spec: v1alpha1.LLMServerPoolSpec{
TargetPort: int32(8000),
},
Expand Down Expand Up @@ -114,15 +112,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
},
wantPods: populateMap(basePod1, basePod2),
},
{
name: "Existing pod not ready, new pod added, and is ready",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
LLMServerPool: &v1alpha1.LLMServerPool{
datastore: &K8sDatastore{
pods: populateMap(basePod1, basePod2),
llmServerPool: &v1alpha1.LLMServerPool{
Spec: v1alpha1.LLMServerPoolSpec{
TargetPort: int32(8000),
},
Expand Down Expand Up @@ -162,18 +158,16 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod3, basePod2),
},
wantPods: populateMap(basePod3, basePod2),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""}
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: test.datastore, Zone: ""}
endpointSliceReconciler.updateDatastore(test.incomingSlice)

if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) {
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods)
if mapsEqual(endpointSliceReconciler.Datastore.pods, test.wantPods) {
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.pods, test.wantPods)
}
})
}
Expand Down
97 changes: 0 additions & 97 deletions pkg/ext-proc/backend/llmlserverpool_reconciler_test.go

This file was deleted.

13 changes: 4 additions & 9 deletions pkg/ext-proc/backend/llmserverpool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,20 @@ func (c *LLMServerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace {
return ctrl.Result{}, nil
}
klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName)
klog.V(2).Infof("Reconciling LLMServerPool %v", req.NamespacedName)

serverPool := &v1alpha1.LLMServerPool{}
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
klog.Error(err, "unable to get LLMServerPool")
klog.Errorf("Unable to get LLMServerPool: %v", err)
return ctrl.Result{}, err
}

c.updateDatastore(serverPool)
klog.V(2).Infof("Updated LLMServerPool: %+v", serverPool)
c.Datastore.setLLMServerPool(serverPool)

return ctrl.Result{}, nil
}

func (c *LLMServerPoolReconciler) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
c.Datastore.LLMServerPool = serverPool
}
}

func (c *LLMServerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.LLMServerPool{}).
Expand Down
Loading

0 comments on commit bce8d6f

Please sign in to comment.