Skip to content

Commit

Permalink
Fix flaky aggregator shutdown test. (aws#619)
Browse files Browse the repository at this point in the history
* Fix flaky aggregator shutdown test.
  • Loading branch information
adam-mateen authored Oct 21, 2022
1 parent 23ac4f1 commit 0e786fc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
6 changes: 5 additions & 1 deletion plugins/outputs/cloudwatch/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,16 @@ func newDurationAggregator(durationInSeconds time.Duration,

func (durationAgg *durationAggregator) aggregating() {
durationAgg.wg.Add(1)
// sleep for some time until next round duration from now.
// Sleep to align the interval to the wall clock.
// This initial sleep is not interrupted if the aggregator gets shutdown.
now := time.Now()
time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now))
durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration)
defer durationAgg.ticker.Stop()
for {
// There is no priority to select{}.
// If there is a new metric AND the shutdownChan is closed when this
// loop begins, then the behavior is random.
select {
case m := <-durationAgg.aggregationChan:
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
Expand Down
25 changes: 18 additions & 7 deletions plugins/outputs/cloudwatch/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,30 @@ func TestAggregator_ShutdownBehavior(t *testing.T) {
// verify the remaining metrics can be read after shutdown
// the metrics should be available immediately after the shutdown even before aggregation period
aggregationInterval := 2 * time.Second
tags := map[string]string{"d1key": "d1value", "d2key": "d2value", aggregationIntervalTagKey: aggregationInterval.String()}
tags := map[string]string{
"d1key": "d1value",
"d2key": "d2value",
aggregationIntervalTagKey: aggregationInterval.String()}
fields := map[string]interface{}{"value": 1}
timestamp := time.Now()
m := metric.New(metricName, tags, fields, timestamp)
aggregator.AddMetric(m)

//give some time to aggregation to do the work
time.Sleep(time.Second * 2)

// The Aggregator creates a new durationAggregator for each metric.
// And there is a delay when each new durationAggregator begins.
// So submit a metric and wait for the first aggregation to occur.
assertMetricContent(t, metricChan, 3*aggregationInterval, m, expectedFieldContent{
"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertNoMetricsInChan(t, metricChan)
// Now submit the same metric and it should be routed to the existing
// durationAggregator without delay.
timestamp = time.Now()
m = metric.New(metricName, tags, fields, timestamp)
aggregator.AddMetric(m)
// Shutdown before the 2nd aggregationInterval completes.
close(shutdownChan)
wg.Wait()

assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{
"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertNoMetricsInChan(t, metricChan)
}

Expand Down

0 comments on commit 0e786fc

Please sign in to comment.