forked from fgeller/kt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoder.go
222 lines (200 loc) · 6.3 KB
/
coder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package main
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"github.com/heetch/avro"
"github.com/heetch/avro/avroregistry"
goavro "github.com/linkedin/goavro/v2"
)
// coder implements the common data required for encoding/decoding data
type coder struct {
topic string
registryURL string
avscFile string
avroRecordName string
avroRegistry *avroregistry.Registry
avroSchema *avro.Type
avroSchemaID int64
}
// addFlags adds flags required for encoding and decoding.
func (c *coder) addFlags(flags *flag.FlagSet) {
flags.StringVar(&c.registryURL, "registry", "", "The Avro schema registry server URL.")
flags.StringVar(&c.avscFile, "value-avro-schema", "", `Path to AVSC file to format the file. If it is set, then -valuecodec is set to "avro"`)
flags.StringVar(&c.avroRecordName, "value-avro-record-name", "", "Record name to use when using TopicRecordNameStrategy to find the schema subject in Schema Registry")
flags.Int64Var(&c.avroSchemaID, "value-avro-schema-id", 0, "Use it to select schema to produce Avro formatted schemas.")
}
// decoderForType returns a function to decode key or value depending of the expected format defined in typ
func (c *coder) decoderForType(keyOrValue, typ string) (func(m json.RawMessage) ([]byte, error), error) {
var dec func(s string) ([]byte, error)
switch typ {
case "json":
// Easy case - we already have the JSON-marshaled data.
return func(m json.RawMessage) ([]byte, error) {
return m, nil
}, nil
case "hex":
dec = hex.DecodeString
case "base64":
dec = base64.StdEncoding.DecodeString
case "string":
dec = func(s string) ([]byte, error) {
return []byte(s), nil
}
case "avro":
return c.makeAvroDecoder(keyOrValue), nil
default:
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex and base64 are supported`, typ)
}
return func(m json.RawMessage) ([]byte, error) {
var s string
if err := json.Unmarshal(m, &s); err != nil {
return nil, err
}
return dec(s)
}, nil
}
func (c *coder) makeAvroDecoder(keyOrValue string) func(m json.RawMessage) ([]byte, error) {
return func(m json.RawMessage) ([]byte, error) {
// Check strategy to use
// Subject using Kafka Schema Registry for value TopicNameStrategy
subject := c.topic + "-" + keyOrValue
if c.avroRecordName != "" {
// Use TopicRecordNameStrategy
subject = c.topic + "-" + c.avroRecordName
}
enc := c.avroRegistry.Encoder(subject)
ctx := context.Background()
if c.avroSchemaID == 0 {
if c.avroSchema == nil {
sch, err := c.avroRegistry.Schema(ctx, subject, "latest")
if err != nil {
return nil, err
}
t, err := avro.ParseType(sch.Schema)
if err != nil {
return nil, err
}
// Caching avroSchemaID for next calls
c.avroSchema = t
c.avroSchemaID = sch.ID
} else {
id, err := enc.IDForSchema(ctx, c.avroSchema)
if err != nil {
return nil, err
}
c.avroSchemaID = id
}
}
if c.avroSchema == nil {
// only got the schemaID, gather from the registry
dec := c.avroRegistry.Decoder()
sch, err := dec.SchemaForID(ctx, c.avroSchemaID)
if err != nil {
return nil, err
}
c.avroSchema = sch
}
// Canonicalize the schema to remove default values and logical types
// to work around https://github.com/linkedin/goavro/issues/198
inCodec, err := goavro.NewCodec(c.avroSchema.CanonicalString(0))
if err != nil {
return nil, err
}
inNative, _, err := inCodec.NativeFromTextual(m)
if err != nil {
return nil, err
}
inBlob, err := inCodec.BinaryFromNative(nil, inNative)
if err != nil {
return nil, err
}
buf := make([]byte, 0, 512)
buf = enc.AppendSchemaID(buf, c.avroSchemaID)
return append(buf, inBlob...), nil
}
}
// loadSchemaFile loads AVSC file from the given path and it is stored in avroSchema
func (c *coder) loadSchemaFile(path string) error {
blob, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read AVSC file: %w", err)
}
t, err := avro.ParseType(string(blob))
if err != nil {
return fmt.Errorf("failed to parse AVSC file: %w", err)
}
c.avroSchema = t
return nil
}
// encoderForType returns a function to encode key or value depending of the expected format defined in typ
func (c *coder) encoderForType(keyOrValue, typ string) (func([]byte) (json.RawMessage, error), error) {
var enc func([]byte) string
switch typ {
case "json":
return func(data []byte) (json.RawMessage, error) {
if err := json.Unmarshal(data, new(json.RawMessage)); err != nil {
return nil, fmt.Errorf("invalid JSON value %q: %v", data, err)
}
return json.RawMessage(data), nil
}, nil
case "hex":
enc = hex.EncodeToString
case "base64":
enc = base64.StdEncoding.EncodeToString
case "string":
enc = func(data []byte) string {
return string(data)
}
case "avro":
return c.encodeAvro, nil
case "none":
return func([]byte) (json.RawMessage, error) {
return nil, nil
}, nil
default:
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex, base64 and avro are supported`, typ)
}
return func(data []byte) (json.RawMessage, error) {
if data == nil {
return nullJSON, nil
}
data1, err := json.Marshal(enc(data))
if err != nil {
// marshaling a string cannot fail but be defensive.
return nil, err
}
return json.RawMessage(data1), nil
}, nil
}
func (c *coder) encodeAvro(data []byte) (json.RawMessage, error) {
dec := c.avroRegistry.Decoder()
id, body := dec.DecodeSchemaID(data)
if body == nil {
return nil, fmt.Errorf("cannot decode schema id")
}
// TODO: cache the schema
schema, err := dec.SchemaForID(context.Background(), id)
if err != nil {
return nil, fmt.Errorf("cannot get schema for id %d: %v", id, err)
}
// Canonicalize the schema to remove default values and logical types
// to work around https://github.com/linkedin/goavro/issues/198
codec, err := goavro.NewCodec(schema.CanonicalString(0))
if err != nil {
return nil, fmt.Errorf("cannot create codec from schema %s: %v", schema, err)
}
native, _, err := codec.NativeFromBinary(body)
if err != nil {
return nil, fmt.Errorf("cannot convert native from binary: %v", err)
}
textual, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, fmt.Errorf("cannot convert textual from native: %v", err)
}
return json.RawMessage(textual), nil
}