Skip to content

Commit

Permalink
Add back context
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Oct 12, 2023
1 parent c81477f commit d84ac77
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f

loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver)
dp.Report()
dp.Report(ctx)
_, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp))
if err != nil {
dp.ReportFailed(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package http

import (
"context"
"time"

"github.com/elastic/elastic-agent-libs/atomic"
Expand Down Expand Up @@ -45,10 +46,10 @@ func (dp *downloadProgressReporter) Write(b []byte) (int, error) {
return n, nil
}

// Report periodically reports download progress to registered observers. Callers MUST call
// either ReportComplete or ReportFailed when they no longer need the downloadProgressReporter
// to avoid resource leaks.
func (dp *downloadProgressReporter) Report() {
// Report periodically reports download progress to registered observers. Callers MUST either
// cancel the context provided to this method OR call either ReportComplete or ReportFailed when
// they no longer need the downloadProgressReporter to avoid resource leaks.
func (dp *downloadProgressReporter) Report(ctx context.Context) {
started := time.Now()
dp.started = started
sourceURI := dp.sourceURI
Expand All @@ -65,6 +66,8 @@ func (dp *downloadProgressReporter) Report() {
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-dp.done:
return
case <-t.C:
Expand All @@ -89,7 +92,7 @@ func (dp *downloadProgressReporter) Report() {
// either ReportComplete or ReportFailed when they no longer need the downloadProgressReporter
// to avoid resource leaks.
func (dp *downloadProgressReporter) ReportComplete() {
defer dp.close()
defer close(dp.done)

// If there are no observers to report progress to, there is nothing to do!
if len(dp.progressObservers) == 0 {
Expand All @@ -110,7 +113,7 @@ func (dp *downloadProgressReporter) ReportComplete() {
// either ReportFailed or ReportComplete when they no longer need the downloadProgressReporter
// to avoid resource leaks.
func (dp *downloadProgressReporter) ReportFailed(err error) {
defer dp.close()
defer close(dp.done)

// If there are no observers to report progress to, there is nothing to do!
if len(dp.progressObservers) == 0 {
Expand All @@ -130,7 +133,3 @@ func (dp *downloadProgressReporter) ReportFailed(err error) {
obs.ReportFailed(dp.sourceURI, timePast, downloaded, dp.length, percentComplete, bytesPerSecond, err)
}
}

func (dp *downloadProgressReporter) close() {
dp.done <- struct{}{}
}

0 comments on commit d84ac77

Please sign in to comment.