-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add write ahead log to collector metrics exporter #631
Conversation
Codecov Report
@@ Coverage Diff @@
## main #631 +/- ##
==========================================
+ Coverage 66.47% 68.23% +1.75%
==========================================
Files 36 36
Lines 4301 4539 +238
==========================================
+ Hits 2859 3097 +238
+ Misses 1314 1289 -25
- Partials 128 153 +25
... and 2 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
} | ||
|
||
err = me.wal.TruncateFront(me.rWALIndex.Load()) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you testing err
to see if it's non-retryable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retryable errors from GCM are handled in the continue
statement above (L423). If this line returns an error then it was a problem with the WAL itself
exporter/collector/metrics.go
Outdated
return err | ||
} | ||
|
||
func (me *MetricsExporter) walRunner(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The single line comment about this function from the PR description could be added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also consider giving the function a more descriptive name such as readAndExportLoop
.
...er/collector/integrationtest/testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to separate the WAL concern from the other parts of our export logic? For example, could you make a self-contained struct that deals with the WAL and has a small API that can be used by the exporter? This could take a couple of forms:
- a struct that has no-op implementations for the nil value (which would remove the repeated nil-checks in the exporter)
- a struct that wraps the "raw" exporter in a version that adds a WAL
exporter/collector/metrics.go
Outdated
func (me *MetricsExporter) setupWAL() error { | ||
walPath := filepath.Join(me.cfg.MetricConfig.WALConfig.Directory, "gcp_metrics_wal") | ||
me.walPath = walPath | ||
metricWal, err := wal.Open(walPath, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we expose the WAL config options to users?
return multierr.Combine(errs...) | ||
} | ||
|
||
// watchWAL watches the WAL directory for a write then returns to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this approach commonly used? For example, I don't see it used (at least directly) in the WAL implementation for the PRW exporter.
This would make sense if WAL writes were coming from a different process, but is there a simpler way where we write to a channel after every successful write to the WAL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the PRW implementation uses it in readPrompbFromWAL(), starting here. I just broke that into its own function for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pointer to the PRW implementation - I could have sworn I looked for fsnotify calls there, but clearly I missed it.
The approach still doesn't make sense to me, but at least there's precedent for doing it! I'd still like to discuss a bit more as a team (especially in the context of in-memory WAL).
func (me *MetricsExporter) walRunner(ctx context.Context) { | ||
defer me.goroutines.Done() | ||
for { | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle ctx.Done()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this context is the one passed to NewGoogleCloudMetricsExporter()
. Is that one intended for initialization only? We're not even passing it to go mExp.exportMetricDescriptorRunner()
on L235.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdownC
is only closed if Shutdown()
is called, so I think handling the main context closing is good to have to prevent from blocking on other terminations (non-gracefully?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I think handling the main context closing is good to have to prevent from blocking on other terminations (non-gracefully?)
That makes sense, but I wasn't sure of the collector's contract here. I figured it would eventually cancel that "initialization" context which would cause this goroutine to exit but that doesn't seem to be the case.
exporter/collector/metrics.go
Outdated
return err | ||
} | ||
|
||
func (me *MetricsExporter) walRunner(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also consider giving the function a more descriptive name such as readAndExportLoop
.
} | ||
me.wal = metricWal | ||
|
||
// sync existing WAL indices |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are a nonzero number of entries in the WAL at startup, does it mean we're recovering from a crash/unclean shutdown? In that case it seems useful to log how many entries we're about to try to recover.
(Crash recovery seems like the biggest benefit of file- vs memory-backed WAL.)
28ed369
to
4030aaa
Compare
7654b46
to
e69ca21
Compare
Sorry for the push spam, trying to debug some flakey CI test results that I can't reproduce as reliably locally or in a container |
49f6b65
to
dade9ab
Compare
internal/cloudmock/metrics.go
Outdated
@@ -104,10 +105,16 @@ func (f *fakeMetricServiceServer) CreateTimeSeries( | |||
var code codes.Code | |||
if strings.Contains(req.Name, "notfound") { | |||
code = codes.NotFound | |||
} else if strings.Contains(req.Name, "unavailable") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add docs somewhere denoting these magic strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are only used in this internal mock server for testing. But I added a comment for this function explaining that a lot better
41f1009
to
7955f6a
Compare
ee4023f
to
6a0e0d7
Compare
failedPoints = int(summary.TotalPointCount - summary.SuccessPointCount) | ||
succeededPoints = int(summary.SuccessPointCount) | ||
} | ||
if !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick question: The retry of network errors are blocking the other requests from being tried while this loop is sleeping too right? Because we expect those to fail too so we just wait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but also to keep the ordering of the requests
me.obs.log.Error(fmt.Sprintf("error reading WAL and exporting: %+v", err)) | ||
} | ||
|
||
// Must have been ErrNotFound, start a file watch and block waiting for updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above if
block doesn't break or return. Won't we get here even if err
is another error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was just to log errors that aren't ErrNotFound
, since not found means we're at the end of the WAL and can be expected sometimes. added a comment explaining this
3d3ccb0
to
41a4fc6
Compare
exporter/collector/README.md
Outdated
- `metric.wal.directory` (optional): Path to local write-ahead-log file. | ||
- `metric.wal.max_backoff` (optional): Maximum duration to retry entries from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say somewhere that the WAL is experimental and subject to change or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall that in the collector, there's a convention of using an experimental_
prefix to signal such things in configuration.
Do we think that this is going to be replaced with an enhanced version of queued_retry
in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aabmass I'm fine with that, but our exporter is beta so I don't think we can change these settings once we publish them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Punya is saying the experimental prefix would signal that the field is outside of the guarantees. But also it would be good to let users know it hasn't been battle tested yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah I commented without seeing Punya's update
I changed these to add the experimental_
prefix, and also to the wal object... it looks a little weird like this:
metric:
experimental_wal:
experimental_directory: my/path
do you think we could just make the wal
subsection experimental and imply that everything under it is also subject to change? Or just to be safe, mark each option experimental?
metric:
experimental_wal:
directory: my/path #subject to change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the second less verbose option is fine. I see they are doing that this processor as well.
ExpectFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json", | ||
CompareFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_expect.json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't really understand what CompareFixturePath is for. How would Expect and Compare files be different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"expect" is what the input for a test converts into, and it verifies that a single test hasn't changed since its last fixture was checked in
"compare" is to cross-reference output fixtures between different tests that should have equal outcomes. In this case, we want to make sure that enabling WAL gives the same output as normal export, based on the same input fixture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see so the files should be identical. I was confused by the name, maybe something like ExpectIdenticalToFixturePath
would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is also a comment in the definition in testcase.go:
// CompareFixturePath is a second output fixture that should be equal to this test's output fixture.
// Used for cross-referencing multiple tests that should have the same output, without overwriting the same fixtures.
exporter/collector/README.md
Outdated
- `metric.wal.directory` (optional): Path to local write-ahead-log file. | ||
- `metric.wal.max_backoff` (optional): Maximum duration to retry entries from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall that in the collector, there's a convention of using an experimental_
prefix to signal such things in configuration.
Do we think that this is going to be replaced with an enhanced version of queued_retry
in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be good to follow up PR with refactoring the WAL into its own struct as metrics.go
is getting pretty long and I think the sprinkled WAL logic could be encapsulated
ExpectFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json", | ||
CompareFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_expect.json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see so the files should be identical. I was confused by the name, maybe something like ExpectIdenticalToFixturePath
would be better
exporter/collector/README.md
Outdated
- `metric.wal.directory` (optional): Path to local write-ahead-log file. | ||
- `metric.wal.max_backoff` (optional): Maximum duration to retry entries from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Punya is saying the experimental prefix would signal that the field is outside of the guarantees. But also it would be good to let users know it hasn't been battle tested yet
exporter/collector/metrics.go
Outdated
// A channel that receives metric descriptor and sends them to GCM once | ||
metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest | ||
client *monitoring.MetricClient | ||
exportFunc func(context.Context, *monitoringpb.CreateTimeSeriesRequest) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment that this is "pluggable" only for testing purposes?
There's now both MetricsExporter.export
and MetricsExporter.exportFunc
which is a bit confusing. I think for testing purposes it would be better to replace client
with an interface so you can pass in a mock and not have to make this pluggable. Maybe for a follow-up PR.
func (me *MetricsExporter) walRunner(ctx context.Context) { | ||
defer me.goroutines.Done() | ||
for { | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this context is the one passed to NewGoogleCloudMetricsExporter()
. Is that one intended for initialization only? We're not even passing it to go mExp.exportMetricDescriptorRunner()
on L235.
exporter/collector/metrics.go
Outdated
runCtx, cancel := context.WithCancel(ctx) | ||
defer cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not using this cancel anywhere, I think this code is a noop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, removed this and changed the runCtx
to just ctx
exporter/collector/metrics.go
Outdated
if me.wal != nil { | ||
if err := me.wal.Close(); err != nil { | ||
me.obs.log.Error(fmt.Sprintf("error closing WAL: %+v\n", err)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving this into into a defer in runWALReadAndExportLoop()
so the goroutine can handle its own cleanup when it receives the shutdown signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm
I think I've addressed all the blockers, going to merge this now so we can get an initial release with it out. |
This adds optional WAL buffering to the collector metrics exporter, heavily based on a similar implementation in the Prometheus remote write exporter.
The flow for this is:
gcp_metrics_wal
PushMetrics()
to write byte-marshalled CreateTimeSeriesRequest objects to the WALwalRunner()
go routine that spawns areadWALAndExport()
loop (the goroutine also handles shutdown signals)readWALAndExport()
continuously reads the latest entry from the WAL and exports it to GCM:watchWAL()
opens a file watch to block for changes to the log file.Added an integration test with WAL enabled to test this