Skip to content

Commit e437915

Browse files
samirketemaNassor
authored andcommitted
fix: opencdc unwrap
Co-authored-by: Nassor <[email protected]>
1 parent 2e2fde6 commit e437915

File tree

2 files changed

+301
-3
lines changed

2 files changed

+301
-3
lines changed

pkg/processor/procbuiltin/unwrap.go

+169-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const (
4343

4444
FormatDebezium = "debezium"
4545
FormatKafkaConnect = "kafka-connect"
46+
FormatOpenCDC = "opencdc"
4647
)
4748

4849
func init() {
@@ -60,6 +61,8 @@ func Unwrap(config processor.Config) (processor.Interface, error) {
6061
proc.unwrapper = &debeziumUnwrapper{}
6162
case FormatKafkaConnect:
6263
proc.unwrapper = &kafkaConnectUnwrapper{}
64+
case FormatOpenCDC:
65+
proc.unwrapper = &openCDCUnwrapper{}
6366
default:
6467
return nil, cerrors.Errorf("%s: %q is not a valid format", unwrapProcType, format)
6568
}
@@ -93,6 +96,172 @@ func (p *unwrapProcessor) Process(_ context.Context, in record.Record) (record.R
9396
return out, nil
9497
}
9598

99+
/*
100+
Example of an OpenCDC record:
101+
{
102+
"key": "NWQ0N2UwZGQtNTkxYi00MGEyLTk3YzMtYzc1MDY0MWU3NTc1",
103+
"metadata": {
104+
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
105+
"opencdc.readAt": "1706028881541916000",
106+
"opencdc.version": "v1"
107+
},
108+
"operation": "create",
109+
"payload": {
110+
"after": {
111+
"event_id": 2041181862,
112+
"msg": "string 4c88f20f-aa77-4f4b-9354-e4fdb1989a52",
113+
"pg_generator": false,
114+
"sensor_id": 54434691,
115+
"triggered": false
116+
},
117+
"before": null
118+
},
119+
"position": "ZWIwNmJiMmMtNWNhMS00YjUyLWE2ZmMtYzc0OTFlZDQ3OTYz"
120+
}
121+
*/
122+
123+
// openCDCUnwrapper unwraps an OpenCDC record from the payload, by unmarhsalling rec.Payload.After into type Record.
124+
type openCDCUnwrapper struct{}
125+
126+
// UnwrapOperation extracts operation from a structuredData record.
127+
func (o *openCDCUnwrapper) UnwrapOperation(structData record.StructuredData) (record.Operation, error) {
128+
var operation record.Operation
129+
op, ok := structData["operation"]
130+
if !ok {
131+
return operation, cerrors.Errorf("record payload after doesn't contain operation")
132+
}
133+
134+
switch o := op.(type) {
135+
case record.Operation:
136+
operation = o
137+
case string:
138+
err := operation.UnmarshalText([]byte(o))
139+
if err != nil {
140+
return operation, cerrors.Errorf("couldn't unmarshal record operation")
141+
}
142+
}
143+
return operation, nil
144+
}
145+
146+
// UnwrapMetadata extracts metadata from a structuredData record.
147+
func (o *openCDCUnwrapper) UnwrapMetadata(structData record.StructuredData) (record.Metadata, error) {
148+
var metadata record.Metadata
149+
meta, ok := structData["metadata"]
150+
if !ok {
151+
return metadata, cerrors.Errorf("record payload after doesn't contain metadata")
152+
}
153+
154+
switch m := meta.(type) {
155+
case record.Metadata:
156+
metadata = m
157+
case map[string]interface{}:
158+
metadata = make(record.Metadata, len(m))
159+
for k, v := range m {
160+
metadata[k] = fmt.Sprint(v)
161+
}
162+
}
163+
return metadata, nil
164+
}
165+
166+
// UnwrapKey extracts key from a structuredData record.
167+
func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.Data, error) {
168+
var key record.Data
169+
ky, ok := structData["key"]
170+
if !ok {
171+
return key, cerrors.Errorf("record payload after doesn't contain key")
172+
}
173+
174+
switch k := ky.(type) {
175+
case record.Data:
176+
key = k
177+
case string:
178+
key = record.RawData{Raw: []byte(k)}
179+
}
180+
181+
return key, nil
182+
}
183+
184+
// UnwrapPayload extracts payload from a structuredData record.
185+
func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) {
186+
var payload record.Change
187+
pl, ok := structData["payload"]
188+
if !ok {
189+
return payload, cerrors.Errorf("record payload doesn't contain payload")
190+
}
191+
192+
switch p := pl.(type) {
193+
case record.Change:
194+
payload = p
195+
case map[string]interface{}:
196+
afterData, ok := p["after"]
197+
if !ok {
198+
return payload, cerrors.Errorf("record payload after doesn't contain payload.after")
199+
}
200+
201+
data, ok := afterData.(map[string]interface{})
202+
if !ok {
203+
return payload, cerrors.Errorf("record payload after payload.after is not a map")
204+
}
205+
206+
convertedData := make(record.StructuredData, len(data))
207+
for k, v := range data {
208+
convertedData[k] = v
209+
}
210+
211+
payload = record.Change{
212+
Before: nil,
213+
After: convertedData,
214+
}
215+
}
216+
return payload, nil
217+
}
218+
219+
// Unwrap replaces the whole record.payload with record.payload.after.payload except position.
220+
func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
221+
var structData record.StructuredData
222+
data := rec.Payload.After
223+
switch d := data.(type) {
224+
case record.RawData:
225+
// unmarshal raw data to structured
226+
err := json.Unmarshal(data.Bytes(), &structData)
227+
if err != nil {
228+
return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err)
229+
}
230+
case record.StructuredData:
231+
structData = d
232+
default:
233+
return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
234+
}
235+
236+
operation, err := o.UnwrapOperation(structData)
237+
if err != nil {
238+
return record.Record{}, err
239+
}
240+
241+
metadata, err := o.UnwrapMetadata(structData)
242+
if err != nil {
243+
return record.Record{}, err
244+
}
245+
246+
key, err := o.UnwrapKey(structData)
247+
if err != nil {
248+
return record.Record{}, err
249+
}
250+
251+
payload, err := o.UnwrapPayload(structData)
252+
if err != nil {
253+
return record.Record{}, err
254+
}
255+
256+
return record.Record{
257+
Key: key,
258+
Position: rec.Position,
259+
Metadata: metadata,
260+
Payload: payload,
261+
Operation: operation,
262+
}, nil
263+
}
264+
96265
/*
97266
Example of a kafka-connect record:
98267
{
@@ -186,7 +355,6 @@ Example of a debezium record:
186355
"schema": {} // will be ignored
187356
}
188357
*/
189-
190358
// debeziumUnwrapper unwraps a debezium record from the payload.
191359
type debeziumUnwrapper struct {
192360
kafkaConnectUnwrapper kafkaConnectUnwrapper

pkg/processor/procbuiltin/unwrap_test.go

+132-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/matryer/is"
2525
)
2626

27-
const DebeziumRecord = `{
27+
const DebeziumRecordPayload = `{
2828
"payload": {
2929
"after": {
3030
"description": "test1",
@@ -42,6 +42,26 @@ const DebeziumRecord = `{
4242
"schema": {}
4343
}`
4444

45+
const OpenCDCRecordPayload = `{
46+
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
47+
"operation": "create",
48+
"metadata": {
49+
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
50+
"opencdc.readAt": "1706028953595546000",
51+
"opencdc.version": "v1"
52+
},
53+
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
54+
"payload": {
55+
"after": {
56+
"event_id": 1747353650,
57+
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
58+
"pg_generator": false,
59+
"sensor_id": 1250383582,
60+
"triggered": false
61+
}
62+
}
63+
}`
64+
4565
func TestUnwrap_Config(t *testing.T) {
4666
tests := []struct {
4767
name string
@@ -108,7 +128,7 @@ func TestUnwrap_Process(t *testing.T) {
108128
Payload: record.Change{
109129
Before: nil,
110130
After: record.RawData{
111-
Raw: []byte(DebeziumRecord),
131+
Raw: []byte(DebeziumRecordPayload),
112132
},
113133
},
114134
},
@@ -401,6 +421,115 @@ func TestUnwrap_Process(t *testing.T) {
401421
},
402422
wantErr: false,
403423
},
424+
{
425+
name: "opencdc with structured data",
426+
config: processor.Config{
427+
Settings: map[string]string{"format": "opencdc"},
428+
},
429+
record: record.Record{
430+
Key: record.RawData{Raw: []byte("one-key")},
431+
Operation: record.OperationCreate,
432+
Metadata: map[string]string{
433+
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
434+
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
435+
"opencdc.createdAt": "1706028953595000000",
436+
"opencdc.readAt": "1706028953606997000",
437+
"opencdc.version": "v1",
438+
},
439+
Payload: record.Change{
440+
Before: nil,
441+
After: record.StructuredData{
442+
"position": []byte("NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0"),
443+
"operation": record.OperationCreate,
444+
"metadata": record.Metadata{
445+
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
446+
"opencdc.readAt": "1706028953595546000",
447+
"opencdc.version": "v1",
448+
},
449+
"key": record.RawData{
450+
Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh"),
451+
},
452+
"payload": record.Change{
453+
Before: nil,
454+
After: record.StructuredData{
455+
"event_id": 1747353650,
456+
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
457+
"pg_generator": false,
458+
"sensor_id": 1250383582,
459+
"triggered": false,
460+
},
461+
},
462+
},
463+
},
464+
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
465+
},
466+
want: record.Record{
467+
Operation: record.OperationCreate,
468+
Metadata: record.Metadata{
469+
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
470+
"opencdc.readAt": "1706028953595546000",
471+
"opencdc.version": "v1",
472+
},
473+
Payload: record.Change{
474+
Before: nil,
475+
After: record.StructuredData{
476+
"event_id": 1747353650,
477+
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
478+
"pg_generator": false,
479+
"sensor_id": 1250383582,
480+
"triggered": false,
481+
},
482+
},
483+
Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")},
484+
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
485+
},
486+
wantErr: false,
487+
},
488+
{
489+
name: "opencdc with raw data",
490+
config: processor.Config{
491+
Settings: map[string]string{"format": "opencdc"},
492+
},
493+
record: record.Record{
494+
Key: record.RawData{Raw: []byte("one-key-raw-data")},
495+
Operation: record.OperationCreate,
496+
Metadata: map[string]string{
497+
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
498+
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
499+
"opencdc.createdAt": "1706028953595000000",
500+
"opencdc.readAt": "1706028953606997000",
501+
"opencdc.version": "v1",
502+
},
503+
Payload: record.Change{
504+
Before: nil,
505+
After: record.RawData{
506+
Raw: []byte(OpenCDCRecordPayload),
507+
},
508+
},
509+
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
510+
},
511+
want: record.Record{
512+
Operation: record.OperationCreate,
513+
Metadata: record.Metadata{
514+
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
515+
"opencdc.readAt": "1706028953595546000",
516+
"opencdc.version": "v1",
517+
},
518+
Payload: record.Change{
519+
Before: nil,
520+
After: record.StructuredData{
521+
"event_id": float64(1747353650),
522+
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
523+
"pg_generator": false,
524+
"sensor_id": float64(1250383582),
525+
"triggered": false,
526+
},
527+
},
528+
Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")},
529+
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
530+
},
531+
wantErr: false,
532+
},
404533
}
405534

406535
for _, tt := range tests {
@@ -413,6 +542,7 @@ func TestUnwrap_Process(t *testing.T) {
413542
if (err != nil) != tt.wantErr {
414543
t.Fatalf("process() error = %v, wantErr = %v", err, tt.wantErr)
415544
}
545+
416546
if diff := cmp.Diff(tt.want, got); diff != "" {
417547
t.Errorf("process() diff = %s", diff)
418548
}

0 commit comments

Comments
 (0)