Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: Add remote API with write client; add remote handler. #1658

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

bwplotka
Copy link
Member

@bwplotka bwplotka commented Oct 21, 2024

This proposes Remote Write API directly in client_golang main module. No dependency added (some new small packages are added under the internal vendored code).

Old attempt: #1656

Both API (client) and handler supports 1.0 and 2.0 Proto messages, but I intentionally only host writev2.Request generated Go code as we plan to deprecate 1.0 and users should use writev2 now. Both API and handler also will work against protobuf message types that use custom generators e.g. with gogoproto or anything else.

TODO:

  • Add examples, documentation
  • Open PR that uses it on Prometheus

//
// It is not safe to use the returned API from multiple goroutines, create a
// separate *API for each goroutine.
func NewAPI(c api.Client, opts ...APIOption) (*API, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider removing error from here and from APIOption as we don't have any option with validation or so. However I added it for future compatibility (we won't be able to easily extend it without breaking compatibility). Note: Currently api/ is experimenta... 🙈 I think I will remove error here and hope for the best, YAGNI.

Suggested change
func NewAPI(c api.Client, opts ...APIOption) (*API, error) {
func NewAPI(c api.Client, opts ...APIOption) *API {

}

// DesymbolizeLabels decodes label references, with given symbols to labels.
func DesymbolizeLabels(labelRefs []uint32, symbols, buf []string) []string {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I just ran into DesymbolizeLabels not being public in prometheus codebase here open-telemetry/opentelemetry-collector-contrib#35751 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you use labels.Labels or some other struct (for labels in Otel)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
func NewRemoteWriteHandler(logger *slog.Logger, store writeStorage) http.Handler {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's bit inconsistent with NewRemoteAPI in terms of options, let's unify one to one or another 🤔

// WriteProtoFullName represents the fully qualified name of the protobuf message
// to use in Remote write 1.0 and 2.0 protocols.
// See https://prometheus.io/docs/specs/remote_write_spec_2_0/#protocol.
type WriteProtoFullName protoreflect.FullName
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Change to string, bit too hard to use

const (
// WriteProtoFullNameV1 represents the `prometheus.WriteRequest` protobuf
// message introduced in the https://prometheus.io/docs/specs/remote_write_spec/.
// DEPRECATED: Use WriteProtoFullNameV2 instead.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For receivers who wants to support it for longer, deprecation might cause some friction, but maybe worth it? 🤔

@bwplotka
Copy link
Member Author

Example use of handler in sink project: https://github.com/bwplotka/sink/blob/main/go/sink/main.go#L51

Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both API (client) and handler supports 1.0 and 2.0 Proto messages, but I intentionally only host writev2.Request generated Go code as we plan to deprecate 1.0 and users should use writev2 now. Both API and handler also will work against protobuf message types that use custom generators e.g. with gogoproto or anything else.

Given that the spec is only just finalized recently and (afaik) still no receivers like Mimir support v2, we should include v1 request code imo.

}

func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WriteProtoFullName, payload []byte, attempt int) (WriteResponseStats, error) {
u := r.client.URL("api/v1/write", nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be configurable imo, not all remote write receivers use the same endpoint

Comment on lines 244 to 247
rs, err := parseWriteResponseStats(resp)
if err != nil {
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we only try to get write response stats/log a warning if the request type was v2?

return &handler{logger: logger, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment with an example of what the string might look like would be useful for people who haven't read or fully understood the v2 spec

Comment on lines +1 to +7
// Copyright (c) The EfficientGo Authors.
// Licensed under the Apache License 2.0.

// Initially copied from Cortex project.

// Package backoff implements backoff timers which increases wait time on every retry, incredibly useful
// in distributed system timeout functionalities.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be pulling this via go.mod as opposed to having copies of the files? or is this the way we include dependencies for the client library?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try as hard as we can to avoid dependencies. I'm in favor of copying other libraries code (respecting copyright) if possible

}

// Read the request body.
body, err := io.ReadAll(r.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be out of scope for Prometheus but it would be nice to limit the request body size to protect the handler. https://github.com/cortexproject/cortex/blob/master/pkg/util/http.go#L180

return
}

decompressed, err := snappy.Decode(nil, body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here. It would be nice if we can check snappy decoded length and see if it exceeds the max size https://github.com/cortexproject/cortex/blob/master/pkg/util/http.go#L247

return &handler{logger: logger, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we can expose?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to exposing this

h.logger.Error("Error decompressing remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can expose a hook for the logic of decoding and decompressing part so that downstream project can just use their existing code without many adjustment.

func decodeRequestBody(r *http.Request) (body []byte, err error)

The main logic that can benefit us is handling v1 and v2 proto and response headers. It would be nice to still reuse the other code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup some requestValidatorDecoder interface that can be overridden downstream would be cool to have.

}

req.Header.Add("Content-Encoding", string(compr))
req.Header.Set("Content-Type", contentTypeHeader(proto))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there should be an option to specify custom headers here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do that with tripperware though 🤔


err = fmt.Errorf("server returned HTTP status %s: %s", resp.Status, body)
if resp.StatusCode/100 == 5 ||
(r.opts.retryOnRateLimit && resp.StatusCode == http.StatusTooManyRequests) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should retryOn429 be configurable here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure!

h.logger.Error("Error decompressing remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup some requestValidatorDecoder interface that can be overridden downstream would be cool to have.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more of a future comment, but we can probably have some common/model codec here too, since we already have the types, like https://github.com/prometheus/prometheus/blob/main/prompb/io/prometheus/write/v2/codec.go, that would translate to common/model types. Would be good for downstream clients, that can choose to not import prometheus at all.

return &handler{logger: logger, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to exposing this

code = http.StatusInternalServerError
}
if code/5 == 100 { // 5xx
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store.Store can ingest both I think

Suggested change
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error())
h.logger.Error("Error while storing the remote write request", "err", storeErr.Error())

// Write writes given, non-empty, protobuf message to a remote storage.
// The https://github.com/planetscale/vtprotobuf methods will be used if your msg
// supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency.
func (r *API) Write(ctx context.Context, msg proto.Message) (_ WriteResponseStats, err error) {
Copy link

@saswatamcode saswatamcode Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be doing something wrong but I sort of hit a snag with this. proto.Message implements a protoreflect.ProtoMessage interface, which basically means the type would need a ProtoReflect() protoreflect.Message method.

This is absent in the case of both v1 and v2 in Prometheus prompb, which are generated with protoc-gen-gogofast I think.
The method is added when you generate code with buf here (which afaiu uses protoc-gen-go).

However, since for v1, we still need to import WriteRequest from prometheus, it can't be passed into this.

To make this method usable for both v1/v2, maybe we could also import v1 proto with buf here? 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good finding! I think the solution here would be to use any or generics that try ProtoReflect, VT Unmarshal or old gogo unmarshal. This way we support all common deserialization methods. Alternatively we could do our own method so anyone can implement.

In your case v1 won't help you because Thanos likely needs gogo (or want to upgrade slowly here). Any if v1 help you can generate yourself too. Still, more relaxed interface here would be way to go. Wanna try it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Will try to add something

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying here: #1710

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for merging. Follow up on moving to exp #1711

* Address remaining feedback

Signed-off-by: Saswata Mukherjee <[email protected]>

* Make Write message type more flexible

Signed-off-by: Saswata Mukherjee <[email protected]>

---------

Signed-off-by: Saswata Mukherjee <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants