Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write ahead log to collector metrics exporter #631

Merged
merged 5 commits into from
Jun 23, 2023

Conversation

damemi
Copy link
Contributor

@damemi damemi commented May 23, 2023

This adds optional WAL buffering to the collector metrics exporter, heavily based on a similar implementation in the Prometheus remote write exporter.

The flow for this is:

  • If WAL is enabled, set it up in the user-specified directory under gcp_metrics_wal
  • Update PushMetrics() to write byte-marshalled CreateTimeSeriesRequest objects to the WAL
    • if WAL is disabled, just export the requests as normal
  • Start a walRunner() go routine that spawns a readWALAndExport() loop (the goroutine also handles shutdown signals)
  • readWALAndExport() continuously reads the latest entry from the WAL and exports it to GCM:
    • If the export is successful, move the read index forward and continue (truncating the log file)
    • If the export fails for DeadlineExceeded or Unavailable, retry forever until success or non-retryable failure
    • If the export fails for any other reason, move the read index forward and continue (truncating the log file and skipping this entry)
  • If there are no entries found in the WAL, watchWAL() opens a file watch to block for changes to the log file.

Added an integration test with WAL enabled to test this

.gitignore Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented May 24, 2023

Codecov Report

Merging #631 (dc30e0f) into main (2050f26) will increase coverage by 1.75%.
The diff coverage is 65.28%.

@@            Coverage Diff             @@
##             main     #631      +/-   ##
==========================================
+ Coverage   66.47%   68.23%   +1.75%     
==========================================
  Files          36       36              
  Lines        4301     4539     +238     
==========================================
+ Hits         2859     3097     +238     
+ Misses       1314     1289      -25     
- Partials      128      153      +25     
Impacted Files Coverage Δ
exporter/collector/config.go 45.04% <ø> (ø)
exporter/collector/metrics.go 72.14% <59.29%> (+4.84%) ⬆️
...er/collector/integrationtest/testcases/testcase.go 81.51% <100.00%> (ø)
...tor/integrationtest/testcases/testcases_metrics.go 100.00% <100.00%> (ø)

... and 2 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

exporter/collector/config.go Show resolved Hide resolved
}

err = me.wal.TruncateFront(me.rWALIndex.Load())
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you testing err to see if it's non-retryable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retryable errors from GCM are handled in the continue statement above (L423). If this line returns an error then it was a problem with the WAL itself

return err
}

func (me *MetricsExporter) walRunner(ctx context.Context) {
Copy link
Contributor

@psx95 psx95 May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single line comment about this function from the PR description could be added here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider giving the function a more descriptive name such as readAndExportLoop.

Copy link
Contributor

@punya punya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to separate the WAL concern from the other parts of our export logic? For example, could you make a self-contained struct that deals with the WAL and has a small API that can be used by the exporter? This could take a couple of forms:

  • a struct that has no-op implementations for the nil value (which would remove the repeated nil-checks in the exporter)
  • a struct that wraps the "raw" exporter in a version that adds a WAL

exporter/collector/config.go Show resolved Hide resolved
exporter/collector/config.go Show resolved Hide resolved
exporter/collector/metrics.go Outdated Show resolved Hide resolved
func (me *MetricsExporter) setupWAL() error {
walPath := filepath.Join(me.cfg.MetricConfig.WALConfig.Directory, "gcp_metrics_wal")
me.walPath = walPath
metricWal, err := wal.Open(walPath, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expose the WAL config options to users?

exporter/collector/config.go Outdated Show resolved Hide resolved
return multierr.Combine(errs...)
}

// watchWAL watches the WAL directory for a write then returns to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this approach commonly used? For example, I don't see it used (at least directly) in the WAL implementation for the PRW exporter.

This would make sense if WAL writes were coming from a different process, but is there a simpler way where we write to a channel after every successful write to the WAL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the PRW implementation uses it in readPrompbFromWAL(), starting here. I just broke that into its own function for readability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer to the PRW implementation - I could have sworn I looked for fsnotify calls there, but clearly I missed it.

The approach still doesn't make sense to me, but at least there's precedent for doing it! I'd still like to discuss a bit more as a team (especially in the context of in-memory WAL).

exporter/collector/metrics.go Outdated Show resolved Hide resolved
func (me *MetricsExporter) walRunner(ctx context.Context) {
defer me.goroutines.Done()
for {
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle ctx.Done()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this context is the one passed to NewGoogleCloudMetricsExporter(). Is that one intended for initialization only? We're not even passing it to go mExp.exportMetricDescriptorRunner() on L235.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdownC is only closed if Shutdown() is called, so I think handling the main context closing is good to have to prevent from blocking on other terminations (non-gracefully?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I think handling the main context closing is good to have to prevent from blocking on other terminations (non-gracefully?)

That makes sense, but I wasn't sure of the collector's contract here. I figured it would eventually cancel that "initialization" context which would cause this goroutine to exit but that doesn't seem to be the case.

return err
}

func (me *MetricsExporter) walRunner(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider giving the function a more descriptive name such as readAndExportLoop.

exporter/collector/metrics.go Outdated Show resolved Hide resolved
}
me.wal = metricWal

// sync existing WAL indices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are a nonzero number of entries in the WAL at startup, does it mean we're recovering from a crash/unclean shutdown? In that case it seems useful to log how many entries we're about to try to recover.

(Crash recovery seems like the biggest benefit of file- vs memory-backed WAL.)

@damemi damemi force-pushed the metrics-wal branch 2 times, most recently from 28ed369 to 4030aaa Compare May 30, 2023 12:14
@damemi damemi force-pushed the metrics-wal branch 9 times, most recently from 7654b46 to e69ca21 Compare June 13, 2023 15:38
@damemi
Copy link
Contributor Author

damemi commented Jun 13, 2023

Sorry for the push spam, trying to debug some flakey CI test results that I can't reproduce as reliably locally or in a container

@damemi damemi force-pushed the metrics-wal branch 3 times, most recently from 49f6b65 to dade9ab Compare June 13, 2023 17:54
@@ -104,10 +105,16 @@ func (f *fakeMetricServiceServer) CreateTimeSeries(
var code codes.Code
if strings.Contains(req.Name, "notfound") {
code = codes.NotFound
} else if strings.Contains(req.Name, "unavailable") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docs somewhere denoting these magic strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are only used in this internal mock server for testing. But I added a comment for this function explaining that a lot better

@damemi damemi force-pushed the metrics-wal branch 2 times, most recently from 41f1009 to 7955f6a Compare June 14, 2023 15:42
@damemi damemi force-pushed the metrics-wal branch 8 times, most recently from ee4023f to 6a0e0d7 Compare June 16, 2023 14:19
exporter/collector/config.go Show resolved Hide resolved
exporter/collector/metrics.go Outdated Show resolved Hide resolved
failedPoints = int(summary.TotalPointCount - summary.SuccessPointCount)
succeededPoints = int(summary.SuccessPointCount)
}
if !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question: The retry of network errors are blocking the other requests from being tried while this loop is sleeping too right? Because we expect those to fail too so we just wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but also to keep the ordering of the requests

me.obs.log.Error(fmt.Sprintf("error reading WAL and exporting: %+v", err))
}

// Must have been ErrNotFound, start a file watch and block waiting for updates.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above if block doesn't break or return. Won't we get here even if err is another error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was just to log errors that aren't ErrNotFound, since not found means we're at the end of the WAL and can be expected sometimes. added a comment explaining this

@damemi damemi force-pushed the metrics-wal branch 2 times, most recently from 3d3ccb0 to 41a4fc6 Compare June 22, 2023 16:27
Comment on lines 183 to 184
- `metric.wal.directory` (optional): Path to local write-ahead-log file.
- `metric.wal.max_backoff` (optional): Maximum duration to retry entries from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say somewhere that the WAL is experimental and subject to change or similar?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall that in the collector, there's a convention of using an experimental_ prefix to signal such things in configuration.

Do we think that this is going to be replaced with an enhanced version of queued_retry in future?

Copy link
Contributor Author

@damemi damemi Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aabmass I'm fine with that, but our exporter is beta so I don't think we can change these settings once we publish them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Punya is saying the experimental prefix would signal that the field is outside of the guarantees. But also it would be good to let users know it hasn't been battle tested yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah I commented without seeing Punya's update

I changed these to add the experimental_ prefix, and also to the wal object... it looks a little weird like this:

metric:
  experimental_wal:
    experimental_directory: my/path

do you think we could just make the wal subsection experimental and imply that everything under it is also subject to change? Or just to be safe, mark each option experimental?

metric:
  experimental_wal:
    directory: my/path #subject to change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the second less verbose option is fine. I see they are doing that this processor as well.

Comment on lines +237 to +249
ExpectFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json",
CompareFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_expect.json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't really understand what CompareFixturePath is for. How would Expect and Compare files be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"expect" is what the input for a test converts into, and it verifies that a single test hasn't changed since its last fixture was checked in

"compare" is to cross-reference output fixtures between different tests that should have equal outcomes. In this case, we want to make sure that enabling WAL gives the same output as normal export, based on the same input fixture.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see so the files should be identical. I was confused by the name, maybe something like ExpectIdenticalToFixturePath would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also a comment in the definition in testcase.go:

 	// CompareFixturePath is a second output fixture that should be equal to this test's output fixture.
	// Used for cross-referencing multiple tests that should have the same output, without overwriting the same fixtures.

Comment on lines 183 to 184
- `metric.wal.directory` (optional): Path to local write-ahead-log file.
- `metric.wal.max_backoff` (optional): Maximum duration to retry entries from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall that in the collector, there's a convention of using an experimental_ prefix to signal such things in configuration.

Do we think that this is going to be replaced with an enhanced version of queued_retry in future?

exporter/collector/config.go Show resolved Hide resolved
exporter/collector/integrationtest/inmemoryocexporter.go Outdated Show resolved Hide resolved
Copy link
Contributor

@aabmass aabmass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be good to follow up PR with refactoring the WAL into its own struct as metrics.go is getting pretty long and I think the sprinkled WAL logic could be encapsulated

Comment on lines +237 to +249
ExpectFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json",
CompareFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_expect.json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see so the files should be identical. I was confused by the name, maybe something like ExpectIdenticalToFixturePath would be better

Comment on lines 183 to 184
- `metric.wal.directory` (optional): Path to local write-ahead-log file.
- `metric.wal.max_backoff` (optional): Maximum duration to retry entries from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Punya is saying the experimental prefix would signal that the field is outside of the guarantees. But also it would be good to let users know it hasn't been battle tested yet

// A channel that receives metric descriptor and sends them to GCM once
metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest
client *monitoring.MetricClient
exportFunc func(context.Context, *monitoringpb.CreateTimeSeriesRequest) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that this is "pluggable" only for testing purposes?

There's now both MetricsExporter.export and MetricsExporter.exportFunc which is a bit confusing. I think for testing purposes it would be better to replace client with an interface so you can pass in a mock and not have to make this pluggable. Maybe for a follow-up PR.

exporter/collector/metrics.go Show resolved Hide resolved
func (me *MetricsExporter) walRunner(ctx context.Context) {
defer me.goroutines.Done()
for {
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this context is the one passed to NewGoogleCloudMetricsExporter(). Is that one intended for initialization only? We're not even passing it to go mExp.exportMetricDescriptorRunner() on L235.

Comment on lines 576 to 577
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not using this cancel anywhere, I think this code is a noop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, removed this and changed the runCtx to just ctx

Comment on lines 142 to 145
if me.wal != nil {
if err := me.wal.Close(); err != nil {
me.obs.log.Error(fmt.Sprintf("error closing WAL: %+v\n", err))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this into into a defer in runWALReadAndExportLoop() so the goroutine can handle its own cleanup when it receives the shutdown signal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

@damemi
Copy link
Contributor Author

damemi commented Jun 23, 2023

I think I've addressed all the blockers, going to merge this now so we can get an initial release with it out.

@damemi damemi merged commit 182fb27 into GoogleCloudPlatform:main Jun 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants