Skip to content

Commit

Permalink
Remove context and handle cancellation internally instead
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Oct 12, 2023
1 parent da06f8c commit 67f80f6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,14 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
}

loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout)
reportCtx, reportCancel := context.WithCancel(ctx)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver)
dp.Report(reportCtx)
dp.Report()
_, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp))
if err != nil {
reportCancel()
dp.ReportFailed(err)
// return path, file already exists and needs to be cleaned up
return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}
reportCancel()
dp.ReportComplete()

return fullPath, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package http

import (
"context"
"time"

"github.com/elastic/elastic-agent-libs/atomic"
Expand All @@ -21,6 +20,7 @@ type downloadProgressReporter struct {
started time.Time

progressObservers []progressObserver
done chan struct{}
}

func newDownloadProgressReporter(sourceURI string, timeout time.Duration, length int, progressObservers ...progressObserver) *downloadProgressReporter {
Expand All @@ -35,6 +35,7 @@ func newDownloadProgressReporter(sourceURI string, timeout time.Duration, length
warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage),
length: float64(length),
progressObservers: progressObservers,
done: make(chan struct{}),
}
}

Expand All @@ -44,9 +45,10 @@ func (dp *downloadProgressReporter) Write(b []byte) (int, error) {
return n, nil
}

// Report periodically reports download progress to registered observers. Callers MUST cancel
// the context passed to this method to avoid resource leaks.
func (dp *downloadProgressReporter) Report(ctx context.Context) {
// Report periodically reports download progress to registered observers. Callers MUST call
// either ReportComplete or ReportFailed when they no longer need the downloadProgressReporter
// to avoid resource leaks.
func (dp *downloadProgressReporter) Report() {
started := time.Now()
dp.started = started
sourceURI := dp.sourceURI
Expand All @@ -63,7 +65,7 @@ func (dp *downloadProgressReporter) Report(ctx context.Context) {
defer t.Stop()
for {
select {
case <-ctx.Done():
case <-dp.done:
return
case <-t.C:
now := time.Now()
Expand All @@ -83,6 +85,9 @@ func (dp *downloadProgressReporter) Report(ctx context.Context) {
}()
}

// ReportComplete reports the completion of a download to registered observers. Callers MUST call
// either ReportComplete or ReportFailed when they no longer need the downloadProgressReporter
// to avoid resource leaks.
func (dp *downloadProgressReporter) ReportComplete() {
now := time.Now()
timePast := now.Sub(dp.started)
Expand All @@ -92,8 +97,13 @@ func (dp *downloadProgressReporter) ReportComplete() {
for _, obs := range dp.progressObservers {
obs.ReportCompleted(dp.sourceURI, timePast, bytesPerSecond)
}

dp.close()
}

// ReportFailed reports the failure of a download to registered observers. Callers MUST call
// either ReportFailed or ReportComplete when they no longer need the downloadProgressReporter
// to avoid resource leaks.
func (dp *downloadProgressReporter) ReportFailed(err error) {
now := time.Now()
timePast := now.Sub(dp.started)
Expand All @@ -107,4 +117,10 @@ func (dp *downloadProgressReporter) ReportFailed(err error) {
for _, obs := range dp.progressObservers {
obs.ReportFailed(dp.sourceURI, timePast, downloaded, dp.length, percentComplete, bytesPerSecond, err)
}

dp.close()
}

func (dp *downloadProgressReporter) close() {
dp.done <- struct{}{}
}

0 comments on commit 67f80f6

Please sign in to comment.