-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
…ing threading
- Loading branch information
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,42 +1,67 @@ | ||
package client | ||
|
||
import ( | ||
"errors" | ||
Check failure on line 4 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 4 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 4 in internal/client/tasks.go GitHub Actions / lint-test
|
||
"fmt" | ||
Check failure on line 5 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 5 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 5 in internal/client/tasks.go GitHub Actions / lint-test
|
||
|
||
"github.com/google/uuid" | ||
Check failure on line 7 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 7 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 7 in internal/client/tasks.go GitHub Actions / lint-test
|
||
"github.com/sirupsen/logrus" | ||
"golang.org/x/sync/semaphore" | ||
|
||
// "github.com/sirupsen/logrus" | ||
fleetdbapi "github.com/metal-toolbox/fleetdb/pkg/api/v1" | ||
Check failure on line 10 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 10 in internal/client/tasks.go GitHub Actions / lint-test
Check failure on line 10 in internal/client/tasks.go GitHub Actions / lint-test
|
||
) | ||
|
||
func (c *Client) CreateConditionInventoryForAllServers(pageSize, inFlightPages int) error { | ||
// Start thread to start collecting servers | ||
serverCh, concLimiter, err := c.GatherServersNonBlocking(pageSize, inFlightPages) | ||
func (c *Client) CreateConditionInventoryForAllServers(pageSize int) error { | ||
// First page, use the response from it to figure out how many pages we have to loop through | ||
// Dont change page size | ||
servers, response, err := c.getServerPage(pageSize, 1) | ||
if err != nil { | ||
c.log.WithFields(logrus.Fields{ | ||
"pageSize": pageSize, | ||
"pageIndex": 1, | ||
}).Logger.Errorf("Failed to get list of servers: %s", err.Error()) | ||
return err | ||
} | ||
totalPages := response.TotalPages | ||
|
||
// Loop through servers and create conditions | ||
for server := range serverCh { | ||
err := c.CreateConditionInventory(server.UUID) | ||
// send first page of servers to the channel | ||
for i := range servers { | ||
err = c.CreateConditionInventory(servers[i].UUID) | ||
if err != nil { | ||
c.log.WithFields(logrus.Fields{ | ||
"server": server.UUID, | ||
}).Logger.Error("Failed to create condition") | ||
return err | ||
} | ||
|
||
concLimiter.Release(1) | ||
} | ||
|
||
return nil | ||
} | ||
c.log.WithFields(logrus.Fields{ | ||
"index": 1, | ||
"iterations": totalPages, | ||
"got": len(servers), | ||
}).Trace("Got server page") | ||
|
||
func (c *Client) GatherServersNonBlocking(pageSize, inFlightPages int) (chan *fleetdbapi.Server, *semaphore.Weighted, error) { | ||
serverCh := make(chan *fleetdbapi.Server) | ||
concLimiter := semaphore.NewWeighted(int64(inFlightPages * pageSize)) | ||
// Start the second page, and loop through rest the pages | ||
for i := 2; i <= totalPages; i++ { | ||
servers, _, err = c.getServerPage(pageSize, i) | ||
if err != nil { | ||
c.log.WithFields(logrus.Fields{ | ||
"pageSize": pageSize, | ||
"pageIndex": i, | ||
}).Logger.Errorf("Failed to get page of servers, attempting to continue: %s", err.Error()) | ||
|
||
go func() { | ||
c.gatherServers(pageSize, serverCh, concLimiter) | ||
}() | ||
continue | ||
} | ||
|
||
c.log.WithFields(logrus.Fields{ | ||
"index": i, | ||
"iterations": totalPages, | ||
"got": len(servers), | ||
}).Trace("Got server page") | ||
|
||
for i := range servers { | ||
err = c.CreateConditionInventory(servers[i].UUID) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
return serverCh, concLimiter, nil | ||
return nil | ||
} |