Skip to content

Commit

Permalink
feat(api): request to start a new build
Browse files Browse the repository at this point in the history
  • Loading branch information
kpetremann committed Mar 27, 2024
1 parent cc1f6cd commit a294243
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 11 deletions.
21 changes: 18 additions & 3 deletions cmd/data-aggregation-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ var (
builtBy = "unknown"
)

func dispatchSingleRequest(incoming chan struct{}) chan bool {
outgoing := make(chan bool)

go func() {
defer close(outgoing)
for range incoming {
log.Info().Msg("Received new build request.")
outgoing <- true
}
}()

return outgoing
}

func run() error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
Expand Down Expand Up @@ -67,10 +81,11 @@ func run() error {
deviceRepo := device.NewSafeRepository()
reports := report.NewRepository()

// TODO: be able to close goroutine when the context is closed (graceful shutdown)
go job.StartBuildLoop(&deviceRepo, &reports)
newBuildRequest := make(chan struct{})
triggerNewBuild := dispatchSingleRequest(newBuildRequest)

if err := router.NewManager(&deviceRepo, &reports).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil {
go job.StartBuildLoop(&deviceRepo, &reports, triggerNewBuild)
if err := router.NewManager(&deviceRepo, &reports, newBuildRequest).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil {
return fmt.Errorf("webserver error: %w", err)
}

Expand Down
13 changes: 13 additions & 0 deletions internal/api/router/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,16 @@ func (m *Manager) getLastSuccessfulReport(w http.ResponseWriter, _ *http.Request
w.Header().Set(contentType, applicationJSON)
_, _ = w.Write(out)
}

// triggerBuild enables the user to trigger a new build.
//
// It only accepts one build request at a time.
func (m *Manager) triggerBuild(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
w.Header().Set(contentType, applicationJSON)
select {
case m.restartRequest <- struct{}{}:
_, _ = w.Write([]byte("{\"message\": \"new build request received\""))
default:
_, _ = w.Write([]byte("{\"message\": \"a build request is already pending\""))
}
}
11 changes: 7 additions & 4 deletions internal/api/router/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ type DevicesRepository interface {
}

type Manager struct {
devices DevicesRepository
reports *report.Repository
devices DevicesRepository
reports *report.Repository
restartRequest chan<- struct{}
}

// NewManager creates and initializes a new API manager.
func NewManager(deviceRepo DevicesRepository, reports *report.Repository) *Manager {
return &Manager{devices: deviceRepo, reports: reports}
func NewManager(deviceRepo DevicesRepository, reports *report.Repository, restartRequest chan<- struct{}) *Manager {
return &Manager{devices: deviceRepo, reports: reports, restartRequest: restartRequest}
}

// ListenAndServe starts to serve Web API requests.
func (m *Manager) ListenAndServe(ctx context.Context, address string, port int) error {
defer func() {
close(m.restartRequest)
log.Warn().Msg("Shutdown.")
}()

Expand All @@ -57,6 +59,7 @@ func (m *Manager) ListenAndServe(ctx context.Context, address string, port int)
router.GET("/v1/report/last", withAuth.Wrap(m.getLastReport))
router.GET("/v1/report/last/complete", withAuth.Wrap(m.getLastCompleteReport))
router.GET("/v1/report/last/successful", withAuth.Wrap(m.getLastSuccessfulReport))
router.POST("/v1/build/trigger", withAuth.Wrap(m.triggerBuild))

listenSocket := fmt.Sprint(address, ":", port)
log.Info().Msgf("Start webserver - listening on %s", listenSocket)
Expand Down
13 changes: 11 additions & 2 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.S
}

// StartBuildLoop starts the build in an infinite loop.
func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository) {
//
// Closing the triggerNewBuild channel will stop the loop.
func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository, triggerNewBuild <-chan bool) {
metricsRegistry := metrics.NewRegistry()
for {
var wg sync.WaitGroup
Expand Down Expand Up @@ -189,6 +191,13 @@ func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Reposit
close(reportCh)
wg.Wait()

time.Sleep(config.Cfg.Build.Interval)
select {
case <-time.After(config.Cfg.Build.Interval):
case c := <-triggerNewBuild:
if !c {
log.Info().Msg("triggerNewBuild channel closed, stopping build loop")
return
}
}
}
}
4 changes: 2 additions & 2 deletions internal/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func logMessage(msg Message) {
// It ends when the channel is closed.
// This function is concurrent-safe.
func (r *Report) Watch(messageChan <-chan Message) {
log.Info().Msg("Starting report dispatcher")
log.Info().Msg("starting report dispatcher")
r.StartTime = time.Now()
for msg := range messageChan {
logMessage(msg)
Expand All @@ -52,7 +52,7 @@ func (r *Report) Watch(messageChan <-chan Message) {
r.mutex.Unlock()
}
r.EndTime = time.Now()
log.Info().Msg("Stopping report dispatcher")
log.Info().Msg("stopping report dispatcher")
}

func (r *Report) ToJSON() ([]byte, error) {
Expand Down

0 comments on commit a294243

Please sign in to comment.