Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: OpenCDC unwrap #1343

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 175 additions & 1 deletion pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (

FormatDebezium = "debezium"
FormatKafkaConnect = "kafka-connect"
FormatOpenCDC = "opencdc"
)

func init() {
Expand All @@ -60,6 +61,8 @@ func Unwrap(config processor.Config) (processor.Interface, error) {
proc.unwrapper = &debeziumUnwrapper{}
case FormatKafkaConnect:
proc.unwrapper = &kafkaConnectUnwrapper{}
case FormatOpenCDC:
proc.unwrapper = &openCDCUnwrapper{}
default:
return nil, cerrors.Errorf("%s: %q is not a valid format", unwrapProcType, format)
}
Expand Down Expand Up @@ -93,6 +96,178 @@ func (p *unwrapProcessor) Process(_ context.Context, in record.Record) (record.R
return out, nil
}

/*
Example of an OpenCDC record:
{
"key": "NWQ0N2UwZGQtNTkxYi00MGEyLTk3YzMtYzc1MDY0MWU3NTc1",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028881541916000",
"opencdc.version": "v1"
},
"operation": "create",
"payload": {
"after": {
"event_id": 2041181862,
"msg": "string 4c88f20f-aa77-4f4b-9354-e4fdb1989a52",
"pg_generator": false,
"sensor_id": 54434691,
"triggered": false
},
"before": null
},
"position": "ZWIwNmJiMmMtNWNhMS00YjUyLWE2ZmMtYzc0OTFlZDQ3OTYz"
}
*/

// openCDCUnwrapper unwraps an OpenCDC record from the payload, by unmarhsalling rec.Payload.After into type Record.
type openCDCUnwrapper struct{}

// UnwrapOperation extracts operation from a structuredData record.
func (o *openCDCUnwrapper) UnwrapOperation(structData record.StructuredData) (record.Operation, error) {
var operation record.Operation
op, ok := structData["operation"]
if !ok {
return operation, cerrors.Errorf("record payload after doesn't contain operation")
}

switch opType := op.(type) {
case record.Operation:
operation = opType
case string:
if err := operation.UnmarshalText([]byte(opType)); err != nil {
return operation, cerrors.Errorf("couldn't unmarshal record operation")
}
default:
return operation, cerrors.Errorf("expected a record.Operation or a string, got %T", opType)
}
return operation, nil
}

// UnwrapMetadata extracts metadata from a structuredData record.
func (o *openCDCUnwrapper) UnwrapMetadata(structData record.StructuredData) (record.Metadata, error) {
var metadata record.Metadata
meta, ok := structData["metadata"]
if !ok {
return metadata, cerrors.Errorf("record payload after doesn't contain metadata")
}

switch m := meta.(type) {
case record.Metadata:
metadata = m
case map[string]interface{}:
metadata = make(record.Metadata, len(m))
for k, v := range m {
metadata[k] = fmt.Sprint(v)
}
default:
return metadata, cerrors.Errorf("expected a record.Metadata or a map[string]interface{}, got %T", m)
}
return metadata, nil
}

// UnwrapKey extracts key from a structuredData record.
func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.Data, error) {
var key record.Data
ky, ok := structData["key"]
if !ok {
return key, cerrors.Errorf("record payload after doesn't contain key")
}

switch k := ky.(type) {
case record.Data:
key = k
case string:
key = record.RawData{Raw: []byte(k)}
default:
return key, cerrors.Errorf("expected a record.Data or a string, got %T", k)
}

return key, nil
}

// UnwrapPayload extracts payload from a structuredData record.
func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) {
var payload record.Change
pl, ok := structData["payload"]
if !ok {
return payload, cerrors.Errorf("record payload doesn't contain payload")
}

switch p := pl.(type) {
case record.Change:
payload = p
case map[string]interface{}:
afterData, ok := p["after"]
if !ok {
return payload, cerrors.Errorf("record payload after doesn't contain payload.after")
}

data, ok := afterData.(map[string]interface{})
if !ok {
return payload, cerrors.Errorf("record payload after payload.after is not a map")
}

convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}

payload = record.Change{
Before: nil,
After: convertedData,
}
default:
return payload, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p)
}
return payload, nil
}

// Unwrap replaces the whole record.payload with record.payload.after.payload except position.
func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
var structData record.StructuredData
data := rec.Payload.After
switch d := data.(type) {
case record.RawData:
// unmarshal raw data to structured
if err := json.Unmarshal(data.Bytes(), &structData); err != nil {
return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err)
}
case record.StructuredData:
structData = d
default:
return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
}

operation, err := o.UnwrapOperation(structData)
if err != nil {
return record.Record{}, err
}

metadata, err := o.UnwrapMetadata(structData)
if err != nil {
return record.Record{}, err
}

key, err := o.UnwrapKey(structData)
if err != nil {
return record.Record{}, err
}

payload, err := o.UnwrapPayload(structData)
if err != nil {
return record.Record{}, err
}

return record.Record{
Key: key,
Position: rec.Position,
Metadata: metadata,
Payload: payload,
Operation: operation,
}, nil
}

/*
Example of a kafka-connect record:
{
Expand Down Expand Up @@ -186,7 +361,6 @@ Example of a debezium record:
"schema": {} // will be ignored
}
*/

// debeziumUnwrapper unwraps a debezium record from the payload.
type debeziumUnwrapper struct {
kafkaConnectUnwrapper kafkaConnectUnwrapper
Expand Down
134 changes: 132 additions & 2 deletions pkg/processor/procbuiltin/unwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/matryer/is"
)

const DebeziumRecord = `{
const DebeziumRecordPayload = `{
"payload": {
"after": {
"description": "test1",
Expand All @@ -42,6 +42,26 @@ const DebeziumRecord = `{
"schema": {}
}`

const OpenCDCRecordPayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"after": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
}
}
}`

func TestUnwrap_Config(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -108,7 +128,7 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(DebeziumRecord),
Raw: []byte(DebeziumRecordPayload),
},
},
},
Expand Down Expand Up @@ -401,6 +421,115 @@ func TestUnwrap_Process(t *testing.T) {
},
wantErr: false,
},
{
name: "opencdc with structured data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"position": []byte("NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0"),
"operation": record.OperationCreate,
"metadata": record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
"key": record.RawData{
Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh"),
},
"payload": record.Change{
Before: nil,
After: record.StructuredData{
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false,
},
},
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationCreate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordPayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationCreate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
}

for _, tt := range tests {
Expand All @@ -413,6 +542,7 @@ func TestUnwrap_Process(t *testing.T) {
if (err != nil) != tt.wantErr {
t.Fatalf("process() error = %v, wantErr = %v", err, tt.wantErr)
}

if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("process() diff = %s", diff)
}
Expand Down
Loading