Skip to content

Commit

Permalink
Don't allow multiple build queues (#7928) (#11222)
Browse files Browse the repository at this point in the history
Co-authored-by: Pawel Winogrodzki <[email protected]>
  • Loading branch information
dmcilvaney and PawelWMS authored Nov 26, 2024
1 parent de68b92 commit 4828b0c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
7 changes: 6 additions & 1 deletion toolkit/tools/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,12 @@ func buildAllNodes(stopOnFailure, canUseCache bool, packagesToRebuild, testsToRe
logger.Log.Debugf("Found %d unblocked nodes: %v.", len(nodesToBuild), nodesToBuild)

// Each node that is ready to build must be converted into a build request and submitted to the worker pool.
newRequests := schedulerutils.ConvertNodesToRequests(pkgGraph, graphMutex, nodesToBuild, packagesToRebuild, testsToRerun, buildState, canUseCache)
newRequests, requestError := schedulerutils.ConvertNodesToRequests(pkgGraph, graphMutex, nodesToBuild, packagesToRebuild, testsToRerun, buildState, canUseCache)
if requestError != nil {
err = fmt.Errorf("failed to convert nodes to requests:\n%w", requestError)
stopBuilding = true
break
}
for _, req := range newRequests {
buildState.RecordBuildRequest(req)
// Decide which priority the build should be. Generally we want to get any remote or prebuilt nodes out of the
Expand Down
22 changes: 22 additions & 0 deletions toolkit/tools/scheduler/schedulerutils/graphbuildstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (g *GraphBuildState) ActiveBuildFromSRPM(srpmFileName string) *BuildRequest
return nil
}

// IsSRPMBuildActive returns true if a given SRPM is currently queued for building.
func (g *GraphBuildState) IsSRPMBuildActive(srpmFileName string) bool {
return g.ActiveBuildFromSRPM(srpmFileName) != nil
}

// ActiveSRPMs returns a list of all SRPMs, which are currently being built.
func (g *GraphBuildState) ActiveSRPMs() (builtSRPMs []string) {
for _, buildRequest := range g.activeBuilds {
Expand All @@ -146,6 +151,23 @@ func (g *GraphBuildState) ActiveTests() (testedSRPMs []string) {
return
}

// ActiveTestFromSRPM returns a test request for the queried SRPM file
// or nil if the SRPM is not among the active builds.
func (g *GraphBuildState) ActiveTestFromSRPM(srpmFileName string) *BuildRequest {
for _, buildRequest := range g.activeBuilds {
if buildRequest.Node.Type == pkggraph.TypeTest && buildRequest.Node.SRPMFileName() == srpmFileName {
return buildRequest
}
}

return nil
}

// IsSRPMTestActive returns true if a given SRPM is currently queued for testing.
func (g *GraphBuildState) IsSRPMTestActive(srpmFileName string) bool {
return g.ActiveTestFromSRPM(srpmFileName) != nil
}

// BuildFailures returns a slice of all failed builds.
func (g *GraphBuildState) BuildFailures() []*BuildResult {
return g.failures
Expand Down
42 changes: 37 additions & 5 deletions toolkit/tools/scheduler/schedulerutils/preparerequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package schedulerutils

import (
"fmt"
"sync"

"github.com/microsoft/azurelinux/toolkit/tools/internal/logger"
Expand All @@ -27,7 +28,7 @@ import (
// and are queued for building in the testNodesToRequests() function.
// At this point the partner build nodes for these test nodes have either already finished building or are being built,
// thus the check for active and cached SRPMs inside testNodesToRequests().
func ConvertNodesToRequests(pkgGraph *pkggraph.PkgGraph, graphMutex *sync.RWMutex, nodesToBuild []*pkggraph.PkgNode, packagesToRebuild, testsToRerun []*pkgjson.PackageVer, buildState *GraphBuildState, isCacheAllowed bool) (requests []*BuildRequest) {
func ConvertNodesToRequests(pkgGraph *pkggraph.PkgGraph, graphMutex *sync.RWMutex, nodesToBuild []*pkggraph.PkgNode, packagesToRebuild, testsToRerun []*pkgjson.PackageVer, buildState *GraphBuildState, isCacheAllowed bool) (requests []*BuildRequest, err error) {
timestamp.StartEvent("generate requests", nil)
defer timestamp.StopEvent(nil)

Expand Down Expand Up @@ -57,13 +58,23 @@ func ConvertNodesToRequests(pkgGraph *pkggraph.PkgGraph, graphMutex *sync.RWMute
requests = append(requests, req)
}

requests = append(requests, buildNodesToRequests(pkgGraph, buildState, packagesToRebuild, testsToRerun, buildNodes, isCacheAllowed)...)
requests = append(requests, testNodesToRequests(pkgGraph, buildState, testsToRerun, testNodes)...)
newBuildReqs, err := buildNodesToRequests(pkgGraph, buildState, packagesToRebuild, testsToRerun, buildNodes, isCacheAllowed)
if err != nil {
err = fmt.Errorf("failed to convert build nodes to requests:\n%w", err)
return
}
requests = append(requests, newBuildReqs...)
newTestReqs, err := testNodesToRequests(pkgGraph, buildState, testsToRerun, testNodes)
if err != nil {
err = fmt.Errorf("failed to convert test nodes to requests:\n%w", err)
return
}
requests = append(requests, newTestReqs...)

return
}

func buildNodesToRequests(pkgGraph *pkggraph.PkgGraph, buildState *GraphBuildState, packagesToRebuild, testsToRerun []*pkgjson.PackageVer, buildNodesLists map[string][]*pkggraph.PkgNode, isCacheAllowed bool) (requests []*BuildRequest) {
func buildNodesToRequests(pkgGraph *pkggraph.PkgGraph, buildState *GraphBuildState, packagesToRebuild, testsToRerun []*pkgjson.PackageVer, buildNodesLists map[string][]*pkggraph.PkgNode, isCacheAllowed bool) (requests []*BuildRequest, err error) {
for _, buildNodes := range buildNodesLists {
// Check if any of the build nodes is a delta node and mark it. We will use this to determine if the
// build is a delta build that might have pre-built .rpm files available.
Expand All @@ -76,6 +87,17 @@ func buildNodesToRequests(pkgGraph *pkggraph.PkgGraph, buildState *GraphBuildSta
}

defaultNode := buildNodes[0]

// Check if we already queued up this build node for building.
if buildState.IsSRPMBuildActive(defaultNode.SRPMFileName()) || buildState.IsNodeProcessed(defaultNode) {
err = fmt.Errorf("unexpected duplicate build for (%s)", defaultNode.SRPMFileName())
// Temporarily ignore the error, this state is unexpected but not fatal. Error return will be
// restored later once the underlying cause of this error is fixed.
logger.Log.Warnf(err.Error())
err = nil
continue
}

req := buildRequest(pkgGraph, buildState, packagesToRebuild, defaultNode, buildNodes, isCacheAllowed, hasADeltaNode)
requests = append(requests, req)

Expand Down Expand Up @@ -152,13 +174,23 @@ func partnerTestNodesToRequest(pkgGraph *pkggraph.PkgGraph, buildState *GraphBui
// which have already been queued to build or finished building.
//
// NOTE: the caller must guarantee the build state does not change while this function is running.
func testNodesToRequests(pkgGraph *pkggraph.PkgGraph, buildState *GraphBuildState, testsToRerun []*pkgjson.PackageVer, testNodesLists map[string][]*pkggraph.PkgNode) (requests []*BuildRequest) {
func testNodesToRequests(pkgGraph *pkggraph.PkgGraph, buildState *GraphBuildState, testsToRerun []*pkgjson.PackageVer, testNodesLists map[string][]*pkggraph.PkgNode) (requests []*BuildRequest, err error) {
const isDelta = false

for _, testNodes := range testNodesLists {
defaultTestNode := testNodes[0]
srpmFileName := defaultTestNode.SRPMFileName()

// Check if we already queued up this build node for building.
if buildState.IsSRPMBuildActive(srpmFileName) || buildState.IsNodeProcessed(defaultTestNode) {
err = fmt.Errorf("unexpected duplicate test for (%s)", srpmFileName)
// Temporarily ignore the error, this state is unexpected but not fatal. Error return will be
// restored later once the underlying cause of this error is fixed.
logger.Log.Warnf(err.Error())
err = nil
continue
}

buildUsedCache := buildState.IsSRPMCached(srpmFileName)
if buildRequest := buildState.ActiveBuildFromSRPM(srpmFileName); buildRequest != nil {
buildUsedCache = buildRequest.UseCache
Expand Down

0 comments on commit 4828b0c

Please sign in to comment.