From c7c7d4b8064ca66e3e20c6448e2e792fb1d537ea Mon Sep 17 00:00:00 2001 From: Luna Stadler Date: Sun, 5 Mar 2023 22:37:00 +0100 Subject: [PATCH] Stream JSON instead of reading it into memory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Not entirely sure if this code is correct, but testing/quick indicates it might be. Shall test with some more "real" data sometime™️. --- prom-revisionist.go | 50 +++++++++------- token_writer.go | 137 +++++++++++++++++++++++++++++++++++++++++++ token_writer_test.go | 90 ++++++++++++++++++++++++++++ 3 files changed, 257 insertions(+), 20 deletions(-) create mode 100644 token_writer.go create mode 100644 token_writer_test.go diff --git a/prom-revisionist.go b/prom-revisionist.go index 4cab143..c1bb26e 100644 --- a/prom-revisionist.go +++ b/prom-revisionist.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "compress/zlib" + "encoding/json" "flag" "fmt" "io" @@ -12,7 +13,6 @@ import ( "net/url" "os" "regexp" - "strings" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -196,27 +196,30 @@ func main() { } } - buf := new(bytes.Buffer) - _, err = io.Copy(buf, in) - if err != nil { - log.Printf("could not write body: %s", err) - return - } + dec := json.NewDecoder(in) + tw := NewTokenWriter(out, dec) + token, err := dec.Token() + for err == nil { + switch tok := token.(type) { + case string: + replace, ok := rev.config.RenameLabelsReverse[tok] + if ok { + token = replace + } + } + err = tw.Write(token) + if err != nil { + break + } - res := buf.String() - for from, to := range rev.config.RenameLabels { - // TODO: rewrite by using streaming in some way - res = strings.Replace(res, `"`+to+`"`, `"`+from+`"`, -1) + token, err = dec.Token() } - - buf.Reset() - _, err = buf.WriteString(res) - if err != nil { - log.Printf("could not rewrite: %s", err) + if err != nil && err != io.EOF { + log.Printf("could not decode response: %s", err) return } - in = buf + return } _, err = io.Copy(out, in) @@ -308,9 +311,10 @@ type RewriteConfig struct { MatchRaw string `yaml:"match"` WithRaw string `yaml:"with"` } `yaml:"wrap"` - RenameMetrics map[string]string `yaml:"rename-metrics"` - RenameLabels map[string]string `yaml:"rename-labels"` - RewriteMatchers []struct { + RenameMetrics map[string]string `yaml:"rename-metrics"` + RenameLabels map[string]string `yaml:"rename-labels"` + RenameLabelsReverse map[string]string `yaml:"-"` + RewriteMatchers []struct { From *labels.Matcher `yaml:"-"` To *labels.Matcher `yaml:"-"` FromRaw string `yaml:"from"` @@ -358,6 +362,12 @@ func (r *RewriteConfig) Parse() error { } } + // reverse label renames to easily convert them back in the response + r.RenameLabelsReverse = make(map[string]string, len(r.RenameLabels)) + for key, val := range r.RenameLabels { + r.RenameLabelsReverse[val] = key + } + for j, matcher := range r.RewriteMatchers { matchers, err := parser.ParseMetricSelector(matcher.FromRaw) if err != nil { diff --git a/token_writer.go b/token_writer.go new file mode 100644 index 0000000..cbd71f2 --- /dev/null +++ b/token_writer.go @@ -0,0 +1,137 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" +) + +type tokenWriter struct { + w io.Writer + dec *json.Decoder + + writers []TokenWriter +} + +func NewTokenWriter(w io.Writer, dec *json.Decoder) *tokenWriter { + return &tokenWriter{ + w: w, + dec: dec, + writers: []TokenWriter{simpleWriter{}}, + } +} + +func (tw *tokenWriter) Write(token json.Token) error { + if len(tw.writers) == 0 { + return fmt.Errorf("no writer for token %#v", token) + } + + curTw := tw.writers[len(tw.writers)-1] + if curTw.Done(token) { + tw.writers = tw.writers[:len(tw.writers)-1] + curTw = tw.writers[len(tw.writers)-1] + } + + nextTw, err := curTw.Write(tw.w, token, tw.dec.More()) + if err != nil { + return fmt.Errorf("could not write: %w", err) + } + if nextTw != nil { + tw.writers = append(tw.writers, nextTw) + } + + return nil +} + +type TokenWriter interface { + Done(json.Token) bool + Write(io.Writer, json.Token, bool) (TokenWriter, error) +} + +type simpleWriter struct{} + +func (sw simpleWriter) Done(json.Token) bool { return false } +func (sw simpleWriter) Write(w io.Writer, token json.Token, _ bool) (TokenWriter, error) { + var err error + var writer TokenWriter + + switch tok := token.(type) { + case bool: + _, err = fmt.Fprintf(w, "%v", tok) + case string: + _, err = fmt.Fprintf(w, "%q", tok) + case json.Number: + _, err = w.Write([]byte(tok)) + case float64: + _, err = fmt.Fprintf(w, "%f", tok) + case nil: + _, err = w.Write([]byte("null")) + case json.Delim: + if tok == '{' { + writer = &objectWriter{} + } else if tok == '[' { + writer = &arrayWriter{} + } + _, err = w.Write([]byte(tok.String())) + default: + err = fmt.Errorf("unhandled value %#v of type %T", token, token) + } + + return writer, err +} + +type objectWriter struct { + simpleWriter + + n int +} + +func (ow *objectWriter) Done(token json.Token) bool { + return token == json.Delim('}') +} + +func (ow *objectWriter) Write(w io.Writer, token json.Token, more bool) (TokenWriter, error) { + tw, err := ow.simpleWriter.Write(w, token, more) + if err != nil { + return nil, err + } + if tw != nil { + return tw, err + } + + if ow.n%2 == 0 && more { + _, err = w.Write([]byte(":")) + } else if more { + _, err = w.Write([]byte(",")) + } + ow.n += 1 + + return tw, err +} + +type arrayWriter struct { + simpleWriter + + n int +} + +func (aw *arrayWriter) Done(token json.Token) bool { + return token == json.Delim(']') +} + +func (aw *arrayWriter) Write(w io.Writer, token json.Token, more bool) (TokenWriter, error) { + tw, err := aw.simpleWriter.Write(w, token, more) + if err != nil { + return nil, err + } + if tw != nil { + return tw, nil + } + + if more { + _, err = w.Write([]byte(",")) + } + aw.n += 1 + + return tw, err +} diff --git a/token_writer_test.go b/token_writer_test.go new file mode 100644 index 0000000..2cf457d --- /dev/null +++ b/token_writer_test.go @@ -0,0 +1,90 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "math/rand" + "reflect" + "testing" + "testing/quick" + + "github.com/stretchr/testify/require" +) + +type testStruct struct { + Nested struct { + A safeString + B float64 + C bool + D *bool + + DoubleNested struct { + Stuff safeString + // Recursive *testStruct + } + } + + Array []safeString + StructArray []struct { + X safeString + Y float64 + Z float64 + + Contents struct { + Name safeString + } + } +} + +type safeString string + +func (s safeString) Generate(rand *rand.Rand, size int) reflect.Value { + buf := make([]byte, size) + _, err := rand.Read(buf) + if err != nil { + panic(err) + } + return reflect.ValueOf(safeString(fmt.Sprintf("%x", buf))) +} + +func TestTokenWriter(t *testing.T) { + plain := func(ts testStruct) testStruct { + return ts + } + writeAndParse := func(ts testStruct) testStruct { + data, err := json.Marshal(ts) + require.NoError(t, err, "marshal") + + buf := new(bytes.Buffer) + dec := json.NewDecoder(bytes.NewBuffer(data)) + tw := NewTokenWriter(buf, dec) + token, err := dec.Token() + for err == nil { + err = tw.Write(token) + if err != nil { + err = fmt.Errorf("token write: %w", err) + continue + } + token, err = dec.Token() + } + if err != nil && err != io.EOF { + require.NoError(t, err, "token write") + } + + var res testStruct + err = json.Unmarshal(buf.Bytes(), &res) + if synErr, ok := err.(*json.SyntaxError); err != nil && ok { + err = fmt.Errorf("syntax error: %w << %s >>", err, buf.Bytes()[synErr.Offset-10:synErr.Offset+10]) + } + require.NoError(t, err, "unmarshal") + + require.Equal(t, ts, res) + + return res + } + + err := quick.CheckEqual(plain, writeAndParse, nil) + require.NoError(t, err) +}