Skip to content

Commit

Permalink
Stream JSON instead of reading it into memory
Browse files Browse the repository at this point in the history
Not entirely sure if this code is correct, but testing/quick indicates
it might be.  Shall test with some more "real" data sometime™️.
  • Loading branch information
heyLu committed Mar 6, 2023
1 parent 935b743 commit c7c7d4b
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 20 deletions.
50 changes: 30 additions & 20 deletions prom-revisionist.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/json"
"flag"
"fmt"
"io"
Expand All @@ -12,7 +13,6 @@ import (
"net/url"
"os"
"regexp"
"strings"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
Expand Down Expand Up @@ -196,27 +196,30 @@ func main() {
}
}

buf := new(bytes.Buffer)
_, err = io.Copy(buf, in)
if err != nil {
log.Printf("could not write body: %s", err)
return
}
dec := json.NewDecoder(in)
tw := NewTokenWriter(out, dec)
token, err := dec.Token()
for err == nil {
switch tok := token.(type) {
case string:
replace, ok := rev.config.RenameLabelsReverse[tok]
if ok {
token = replace
}
}
err = tw.Write(token)
if err != nil {
break
}

res := buf.String()
for from, to := range rev.config.RenameLabels {
// TODO: rewrite by using streaming in some way
res = strings.Replace(res, `"`+to+`"`, `"`+from+`"`, -1)
token, err = dec.Token()
}

buf.Reset()
_, err = buf.WriteString(res)
if err != nil {
log.Printf("could not rewrite: %s", err)
if err != nil && err != io.EOF {
log.Printf("could not decode response: %s", err)
return
}

in = buf
return
}

_, err = io.Copy(out, in)
Expand Down Expand Up @@ -308,9 +311,10 @@ type RewriteConfig struct {
MatchRaw string `yaml:"match"`
WithRaw string `yaml:"with"`
} `yaml:"wrap"`
RenameMetrics map[string]string `yaml:"rename-metrics"`
RenameLabels map[string]string `yaml:"rename-labels"`
RewriteMatchers []struct {
RenameMetrics map[string]string `yaml:"rename-metrics"`
RenameLabels map[string]string `yaml:"rename-labels"`
RenameLabelsReverse map[string]string `yaml:"-"`
RewriteMatchers []struct {
From *labels.Matcher `yaml:"-"`
To *labels.Matcher `yaml:"-"`
FromRaw string `yaml:"from"`
Expand Down Expand Up @@ -358,6 +362,12 @@ func (r *RewriteConfig) Parse() error {
}
}

// reverse label renames to easily convert them back in the response
r.RenameLabelsReverse = make(map[string]string, len(r.RenameLabels))
for key, val := range r.RenameLabels {
r.RenameLabelsReverse[val] = key
}

for j, matcher := range r.RewriteMatchers {
matchers, err := parser.ParseMetricSelector(matcher.FromRaw)
if err != nil {
Expand Down
137 changes: 137 additions & 0 deletions token_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"encoding/json"
"fmt"
"io"
)

type tokenWriter struct {
w io.Writer
dec *json.Decoder

writers []TokenWriter
}

func NewTokenWriter(w io.Writer, dec *json.Decoder) *tokenWriter {
return &tokenWriter{
w: w,
dec: dec,
writers: []TokenWriter{simpleWriter{}},
}
}

func (tw *tokenWriter) Write(token json.Token) error {
if len(tw.writers) == 0 {
return fmt.Errorf("no writer for token %#v", token)
}

curTw := tw.writers[len(tw.writers)-1]
if curTw.Done(token) {
tw.writers = tw.writers[:len(tw.writers)-1]
curTw = tw.writers[len(tw.writers)-1]
}

nextTw, err := curTw.Write(tw.w, token, tw.dec.More())
if err != nil {
return fmt.Errorf("could not write: %w", err)
}
if nextTw != nil {
tw.writers = append(tw.writers, nextTw)
}

return nil
}

type TokenWriter interface {
Done(json.Token) bool
Write(io.Writer, json.Token, bool) (TokenWriter, error)
}

type simpleWriter struct{}

func (sw simpleWriter) Done(json.Token) bool { return false }
func (sw simpleWriter) Write(w io.Writer, token json.Token, _ bool) (TokenWriter, error) {
var err error
var writer TokenWriter

switch tok := token.(type) {
case bool:
_, err = fmt.Fprintf(w, "%v", tok)
case string:
_, err = fmt.Fprintf(w, "%q", tok)
case json.Number:
_, err = w.Write([]byte(tok))
case float64:
_, err = fmt.Fprintf(w, "%f", tok)
case nil:
_, err = w.Write([]byte("null"))
case json.Delim:
if tok == '{' {
writer = &objectWriter{}
} else if tok == '[' {
writer = &arrayWriter{}
}
_, err = w.Write([]byte(tok.String()))
default:
err = fmt.Errorf("unhandled value %#v of type %T", token, token)
}

return writer, err
}

type objectWriter struct {
simpleWriter

n int
}

func (ow *objectWriter) Done(token json.Token) bool {
return token == json.Delim('}')
}

func (ow *objectWriter) Write(w io.Writer, token json.Token, more bool) (TokenWriter, error) {
tw, err := ow.simpleWriter.Write(w, token, more)
if err != nil {
return nil, err
}
if tw != nil {
return tw, err
}

if ow.n%2 == 0 && more {
_, err = w.Write([]byte(":"))
} else if more {
_, err = w.Write([]byte(","))
}
ow.n += 1

return tw, err
}

type arrayWriter struct {
simpleWriter

n int
}

func (aw *arrayWriter) Done(token json.Token) bool {
return token == json.Delim(']')
}

func (aw *arrayWriter) Write(w io.Writer, token json.Token, more bool) (TokenWriter, error) {
tw, err := aw.simpleWriter.Write(w, token, more)
if err != nil {
return nil, err
}
if tw != nil {
return tw, nil
}

if more {
_, err = w.Write([]byte(","))
}
aw.n += 1

return tw, err
}
90 changes: 90 additions & 0 deletions token_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand"
"reflect"
"testing"
"testing/quick"

"github.com/stretchr/testify/require"
)

type testStruct struct {
Nested struct {
A safeString
B float64
C bool
D *bool

DoubleNested struct {
Stuff safeString
// Recursive *testStruct
}
}

Array []safeString
StructArray []struct {
X safeString
Y float64
Z float64

Contents struct {
Name safeString
}
}
}

type safeString string

func (s safeString) Generate(rand *rand.Rand, size int) reflect.Value {
buf := make([]byte, size)
_, err := rand.Read(buf)
if err != nil {
panic(err)
}
return reflect.ValueOf(safeString(fmt.Sprintf("%x", buf)))
}

func TestTokenWriter(t *testing.T) {
plain := func(ts testStruct) testStruct {
return ts
}
writeAndParse := func(ts testStruct) testStruct {
data, err := json.Marshal(ts)
require.NoError(t, err, "marshal")

buf := new(bytes.Buffer)
dec := json.NewDecoder(bytes.NewBuffer(data))
tw := NewTokenWriter(buf, dec)
token, err := dec.Token()
for err == nil {
err = tw.Write(token)
if err != nil {
err = fmt.Errorf("token write: %w", err)
continue
}
token, err = dec.Token()
}
if err != nil && err != io.EOF {
require.NoError(t, err, "token write")
}

var res testStruct
err = json.Unmarshal(buf.Bytes(), &res)
if synErr, ok := err.(*json.SyntaxError); err != nil && ok {
err = fmt.Errorf("syntax error: %w << %s >>", err, buf.Bytes()[synErr.Offset-10:synErr.Offset+10])
}
require.NoError(t, err, "unmarshal")

require.Equal(t, ts, res)

return res
}

err := quick.CheckEqual(plain, writeAndParse, nil)
require.NoError(t, err)
}

0 comments on commit c7c7d4b

Please sign in to comment.