diff --git a/pkg/common/resources/tracked_resources.go b/pkg/common/resources/tracked_resources.go index 209e1c114..b39b9ebfc 100644 --- a/pkg/common/resources/tracked_resources.go +++ b/pkg/common/resources/tracked_resources.go @@ -84,14 +84,13 @@ func (tr *TrackedResource) Clone() *TrackedResource { // AggregateTrackedResource aggregates resource usage to TrackedResourceMap[instType]. // The time the given resource used is the delta between the resource createTime and currentTime. func (tr *TrackedResource) AggregateTrackedResource(instType string, - resource *Resource, bindTime time.Time) { + resource *Resource, bindTime time.Time, releaseTime time.Time) { if resource == nil { return } tr.Lock() defer tr.Unlock() - releaseTime := time.Now() timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) aggregatedResourceTime, ok := tr.TrackedResourceMap[instType] if !ok { diff --git a/pkg/common/resources/tracked_resources_test.go b/pkg/common/resources/tracked_resources_test.go index 0e2043796..3f7d9906d 100644 --- a/pkg/common/resources/tracked_resources_test.go +++ b/pkg/common/resources/tracked_resources_test.go @@ -201,7 +201,7 @@ func TestTrackedResourceAggregateTrackedResource(t *testing.T) { for _, tt := range tests { t.Run(tt.caseName, func(t *testing.T) { original := NewTrackedResourceFromMap(tt.input.trackedResource) - original.AggregateTrackedResource(tt.input.instType, &Resource{tt.input.otherResource}, tt.input.bindTime) + original.AggregateTrackedResource(tt.input.instType, &Resource{tt.input.otherResource}, tt.input.bindTime, time.Now()) expected := NewTrackedResourceFromMap(tt.expectedTrackedResource) if !reflect.DeepEqual(original.TrackedResourceMap, expected.TrackedResourceMap) { diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go index 216edb7ee..5cd70b1e7 100644 --- a/pkg/events/event_publisher.go +++ b/pkg/events/event_publisher.go @@ -19,6 +19,12 @@ package events import ( + "fmt" + "github.com/apache/yunikorn-core/pkg/common" + "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-scheduler-interface/lib/go/si" + "strconv" + "strings" "sync/atomic" "time" @@ -29,18 +35,20 @@ import ( ) // stores the push event internal -var defaultPushEventInterval = 2 * time.Second +var defaultPushEventInterval = 1 * time.Second type EventPublisher struct { store *EventStore pushEventInterval time.Duration stop atomic.Bool + trackingAppMap map[string]*resources.TrackedResource // storing eventChannel } func CreateShimPublisher(store *EventStore) *EventPublisher { publisher := &EventPublisher{ store: store, pushEventInterval: defaultPushEventInterval, + trackingAppMap: make(map[string]*resources.TrackedResource), } publisher.stop.Store(false) return publisher @@ -58,6 +66,7 @@ func (sp *EventPublisher) StartService() { log.Log(log.Events).Debug("Sending eventChannel", zap.Int("number of messages", len(messages))) eventPlugin.SendEvent(messages) } + sp.AggregateAppTrackedResourceFromEvents(messages) } time.Sleep(sp.pushEventInterval) } @@ -68,6 +77,43 @@ func (sp *EventPublisher) Stop() { sp.stop.Store(true) } +func (sp *EventPublisher) AggregateAppTrackedResourceFromEvents(messages []*si.EventRecord) { + for _, message := range messages { + if message.Type == si.EventRecord_APP && message.EventChangeType == si.EventRecord_REMOVE { + log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message))) + // We need to clean up the trackingAppMap when an application is removed + if message.ReferenceID == "" { + log.Log(log.Events).Info("YK_APP_SUMMARY:", + zap.String("appID", message.ObjectID), + zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), + ) + // This is an application removal event, remove the application from the trackingAppMap + delete(sp.trackingAppMap, message.ObjectID) + } else { + // This is an allocation removal event, aggregate the resources used by the allocation + if _, ok := sp.trackingAppMap[message.ObjectID]; !ok { + sp.trackingAppMap[message.ObjectID] = &resources.TrackedResource{ + TrackedResourceMap: make(map[string]map[string]int64), + } + } + + // The message is in the format of "instanceType:timestamp" + // Split the message to get the instance type and the timestamp for bind time + // Convert the string to an int64 + unixNano, err := strconv.ParseInt(strings.Split(message.Message, common.Separator)[1], 10, 64) + if err != nil { + log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message.Message)) + return + } + instType := strings.Split(message.Message, common.Separator)[0] + // Convert Unix timestamp in nanoseconds to a time.Time object + bindTime := time.Unix(0, unixNano) + sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(instType, resources.NewResourceFromProto(message.Resource), bindTime, time.Unix(0, message.TimestampNano)) + } + } + } +} + func (sp *EventPublisher) getEventStore() *EventStore { return sp.store } diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 4d494f9ce..31c9be42f 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -78,19 +78,14 @@ type Application struct { tags map[string]string // application tags used in scheduling // Private mutable fields need protection - queuePath string - queue *Queue // queue the application is running in - pending *resources.Resource // pending resources from asks for the app - reservations map[string]*reservation // a map of reservations - requests map[string]*AllocationAsk // a map of asks - sortedRequests sortedRequests // list of requests pre-sorted - user security.UserGroup // owner of the application - allocatedResource *resources.Resource // total allocated resources - - usedResource *resources.TrackedResource // keep track of resource usage of the application - preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application - placeholderResource *resources.TrackedResource // keep track of placeholder resource usage of the application - + queuePath string + queue *Queue // queue the application is running in + pending *resources.Resource // pending resources from asks for the app + reservations map[string]*reservation // a map of reservations + requests map[string]*AllocationAsk // a map of asks + sortedRequests sortedRequests // list of requests pre-sorted + user security.UserGroup // owner of the application + allocatedResource *resources.Resource // total allocated resources maxAllocatedResource *resources.Resource // max allocated resources allocatedPlaceholder *resources.Resource // total allocated placeholder resources allocations map[string]*Allocation // list of all allocations @@ -118,29 +113,6 @@ type Application struct { sync.RWMutex } -func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary { - sa.RLock() - defer sa.RUnlock() - state := sa.stateMachine.Current() - resourceUsage := sa.usedResource.Clone() - preemptedUsage := sa.preemptedResource.Clone() - placeHolderUsage := sa.placeholderResource.Clone() - appSummary := &ApplicationSummary{ - ApplicationID: sa.ApplicationID, - SubmissionTime: sa.SubmissionTime, - StartTime: sa.startTime, - FinishTime: sa.finishedTime, - User: sa.user.User, - Queue: sa.queuePath, - State: state, - RmID: rmID, - ResourceUsage: resourceUsage, - PreemptedResource: preemptedUsage, - PlaceholderResource: placeHolderUsage, - } - return appSummary -} - func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eventHandler handler.EventHandler, rmID string) *Application { app := &Application{ ApplicationID: siApp.ApplicationID, @@ -150,9 +122,6 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve tags: siApp.Tags, pending: resources.NewResource(), allocatedResource: resources.NewResource(), - usedResource: resources.NewTrackedResource(), - preemptedResource: resources.NewTrackedResource(), - placeholderResource: resources.NewTrackedResource(), maxAllocatedResource: resources.NewResource(), allocatedPlaceholder: resources.NewResource(), requests: make(map[string]*AllocationAsk), @@ -1655,39 +1624,6 @@ func (sa *Application) decUserResourceUsage(resource *resources.Resource, remove ugm.GetUserManager().DecreaseTrackedResource(sa.queuePath, sa.ApplicationID, resource, sa.user, removeApp) } -// Track used and preempted resources -func (sa *Application) trackCompletedResource(info *Allocation) { - switch { - case info.IsPreempted(): - sa.updatePreemptedResource(info) - case info.IsPlaceholder(): - sa.updatePlaceholderResource(info) - default: - sa.updateUsedResource(info) - } -} - -// When the resource allocated with this allocation is to be removed, -// have the usedResource to aggregate the resource used by this allocation -func (sa *Application) updateUsedResource(info *Allocation) { - sa.usedResource.AggregateTrackedResource(info.GetInstanceType(), - info.GetAllocatedResource(), info.GetBindTime()) -} - -// When the placeholder allocated with this allocation is to be removed, -// have the placeholderResource to aggregate the resource used by this allocation -func (sa *Application) updatePlaceholderResource(info *Allocation) { - sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(), - info.GetAllocatedResource(), info.GetBindTime()) -} - -// When the resource allocated with this allocation is to be preempted, -// have the preemptedResource to aggregate the resource used by this allocation -func (sa *Application) updatePreemptedResource(info *Allocation) { - sa.preemptedResource.AggregateTrackedResource(info.GetInstanceType(), - info.GetAllocatedResource(), info.GetBindTime()) -} - func (sa *Application) ReplaceAllocation(uuid string) *Allocation { sa.Lock() defer sa.Unlock() @@ -1776,16 +1712,11 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term eventWarning = "Application state not changed while removing a placeholder allocation" } } - // Aggregate the resources used by this alloc to the application's resource tracker - sa.trackCompletedResource(alloc) sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp) } else { sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource()) - // Aggregate the resources used by this alloc to the application's resource tracker - sa.trackCompletedResource(alloc) - // When the resource trackers are zero we should not expect anything to come in later. if sa.hasZeroAllocations() { removeApp = true @@ -1835,8 +1766,6 @@ func (sa *Application) RemoveAllAllocations() []*Allocation { allocationsToRelease := make([]*Allocation, 0) for _, alloc := range sa.allocations { allocationsToRelease = append(allocationsToRelease, alloc) - // Aggregate the resources used by this alloc to the application's user resource tracker - sa.trackCompletedResource(alloc) sa.appEvents.sendRemoveAllocationEvent(alloc, si.TerminationType_STOPPED_BY_RM) } @@ -2019,29 +1948,6 @@ func (sa *Application) cleanupAsks() { sa.sortedRequests = nil } -func (sa *Application) cleanupTrackedResource() { - sa.usedResource = nil - sa.placeholderResource = nil - sa.preemptedResource = nil -} - -func (sa *Application) CleanupTrackedResource() { - sa.Lock() - defer sa.Unlock() - sa.cleanupTrackedResource() -} - -func (sa *Application) LogAppSummary(rmID string) { - if sa.startTime.IsZero() { - return - } - appSummary := sa.GetApplicationSummary(rmID) - appSummary.DoLogging() - appSummary.ResourceUsage = nil - appSummary.PreemptedResource = nil - appSummary.PlaceholderResource = nil -} - func (sa *Application) HasPlaceholderAllocation() bool { sa.RLock() defer sa.RUnlock() diff --git a/pkg/scheduler/objects/application_events.go b/pkg/scheduler/objects/application_events.go index 59348dd91..f75229aef 100644 --- a/pkg/scheduler/objects/application_events.go +++ b/pkg/scheduler/objects/application_events.go @@ -20,6 +20,7 @@ package objects import ( "fmt" + "strconv" "time" "golang.org/x/time/rate" @@ -89,7 +90,8 @@ func (evt *applicationEvents) sendRemoveAllocationEvent(alloc *Allocation, termi eventChangeDetail = si.EventRecord_ALLOC_REPLACED } - event := events.CreateAppEventRecord(evt.app.ApplicationID, common.Empty, alloc.GetUUID(), si.EventRecord_REMOVE, eventChangeDetail, alloc.GetAllocatedResource()) + // add bind time and instanceType to the event in the message field + event := events.CreateAppEventRecord(evt.app.ApplicationID, alloc.GetInstanceType()+common.Separator+strconv.FormatInt(alloc.bindTime.UnixNano(), 10), alloc.GetUUID(), si.EventRecord_REMOVE, eventChangeDetail, alloc.GetAllocatedResource()) evt.eventSystem.AddEvent(event) } diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index d52b86cd9..fb00b8cef 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1074,91 +1074,6 @@ func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, mem assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) } -func TestResourceUsageAggregation(t *testing.T) { - setupUGM() - - app := newApplication(appID1, "default", "root.a") - - // nothing allocated - if !resources.IsZero(app.GetAllocatedResource()) { - t.Error("new application has allocated resources") - } - // create an allocation and check the assignment - resMap := map[string]string{"memory": "100", "vcores": "10"} - res, err := resources.NewResourceFromConf(resMap) - assert.NilError(t, err, "failed to create resource with error") - alloc := newAllocation(appID1, "uuid-1", nodeID1, res) - alloc.SetInstanceType(instType1) - // Mock the time to be 3 seconds before - alloc.SetBindTime(time.Now().Add(-3 * time.Second)) - app.AddAllocation(alloc) - - if !resources.Equals(app.allocatedResource, res) { - t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource) - } - allocs := app.GetAllAllocations() - assert.Equal(t, len(allocs), 1) - assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should not be initialized as the allocation is not a placeholder") - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) - - err = app.HandleApplicationEvent(RunApplication) - assert.NilError(t, err, "no error expected new to accepted (completed test)") - - appSummary := app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 0, 0) - - // add more allocations to test the removals - alloc = newAllocation(appID1, "uuid-2", nodeID1, res) - alloc.SetInstanceType(instType1) - - // Mock the time to be 3 seconds before - alloc.SetBindTime(time.Now().Add(-3 * time.Second)) - app.AddAllocation(alloc) - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - - // remove one of the 2 - if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { - t.Error("returned allocations was nil allocation was not removed") - } - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) - - appSummary = app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 300, 30) - - alloc = newAllocation(appID1, "uuid-3", nodeID1, res) - alloc.SetInstanceType(instType1) - app.AddAllocation(alloc) - allocs = app.GetAllAllocations() - assert.Equal(t, len(allocs), 2) - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - - appSummary = app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 300, 30) - - // try to remove a non existing alloc - if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil { - t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) - } - - // remove all left over allocations - if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 { - t.Errorf("returned number of allocations was incorrect: %v", allocs) - } - allocs = app.GetAllAllocations() - assert.Equal(t, len(allocs), 0) - assertUserGroupResource(t, getTestUserGroup(), nil) - - err = app.HandleApplicationEvent(CompleteApplication) - assert.NilError(t, err, "no error expected accepted to completing (completed test)") - - appSummary = app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 600, 60) -} - func TestRejected(t *testing.T) { terminatedTimeout = time.Millisecond * 100 defer func() { @@ -1377,10 +1292,6 @@ func TestReplaceAllocationTracking(t *testing.T) { app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED) assert.Equal(t, "uuid-3", alloc.uuid) assert.Equal(t, false, app.HasPlaceholderAllocation()) - - // check placeholder resource usage - appSummary := app.GetApplicationSummary("default") - assertPlaceHolderResource(t, appSummary, 3000, 300) } func TestTimeoutPlaceholderSoftStyle(t *testing.T) { diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 25d7f32bb..1f0ff2634 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -1483,8 +1483,6 @@ func (pc *PartitionContext) moveTerminatedApp(appID string) { log.Log(log.SchedPartition).Info("Removing terminated application from the application list", zap.String("appID", appID), zap.String("app status", app.CurrentState())) - app.LogAppSummary(pc.RmID) - app.CleanupTrackedResource() pc.Lock() defer pc.Unlock() delete(pc.applications, appID) diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index e62923c45..f1450a1ef 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -1880,7 +1880,7 @@ func assertPreemptedResource(t *testing.T, appSummary *objects.ApplicationSummar func TestPreemption(t *testing.T) { setupUGM() - partition, app1, app2, alloc1, alloc2 := setupPreemption(t) + partition, _, app2, alloc1, alloc2 := setupPreemption(t) res, err := resources.NewResourceFromConf(map[string]string{"vcore": "5"}) assert.NilError(t, err, "failed to create resource") @@ -1937,12 +1937,6 @@ func TestPreemption(t *testing.T) { assert.Equal(t, alloc.GetResult(), objects.Allocated, "result should be allocated") assert.Equal(t, alloc.GetAllocationKey(), allocID3, "expected ask alloc-3 to be allocated") assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}), getExpectedQueuesLimitsForPreemption()) - - appSummary := app1.GetApplicationSummary("default") - assertPreemptedResource(t, appSummary, -1, 5000) - - appSummary = app2.GetApplicationSummary("default") - assertPreemptedResource(t, appSummary, -1, 0) } // Preemption followed by a normal allocation