Skip to content

Commit 4e94216

Browse files
committed
refactors code
1 parent 34257e2 commit 4e94216

File tree

1 file changed

+61
-28
lines changed

1 file changed

+61
-28
lines changed

pkg/processor/procbuiltin/unwrap.go

+61-28
Original file line numberDiff line numberDiff line change
@@ -123,28 +123,12 @@ Example of an OpenCDC record:
123123
// openCDCUnwrapper unwraps an OpenCDC record from the payload, by unmarhsalling rec.Payload.After into type Record.
124124
type openCDCUnwrapper struct{}
125125

126-
// Unwrap replaces the whole record.payload with record.payload.after.payload except position.
127-
func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
128-
var structData record.StructuredData
129-
data := rec.Payload.After
130-
switch d := data.(type) {
131-
case record.RawData:
132-
// unmarshal raw data to structured
133-
err := json.Unmarshal(data.Bytes(), &structData)
134-
if err != nil {
135-
return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err)
136-
}
137-
case record.StructuredData:
138-
structData = d
139-
default:
140-
return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
141-
}
142-
143-
// get record.payload.after.operation
126+
// UnwrapOperation extracts operation from a structuredData record.
127+
func (o *openCDCUnwrapper) UnwrapOperation(structData record.StructuredData) (record.Operation, error) {
144128
var operation record.Operation
145129
op, ok := structData["operation"]
146130
if !ok {
147-
return record.Record{}, cerrors.Errorf("record payload after doesn't contain operation")
131+
return operation, cerrors.Errorf("record payload after doesn't contain operation")
148132
}
149133

150134
switch o := op.(type) {
@@ -153,15 +137,18 @@ func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
153137
case string:
154138
err := operation.UnmarshalText([]byte(o))
155139
if err != nil {
156-
return record.Record{}, cerrors.Errorf("couldn't unmarshal record operation")
140+
return operation, cerrors.Errorf("couldn't unmarshal record operation")
157141
}
158142
}
143+
return operation, nil
144+
}
159145

160-
// get record.payload.after.metadata
146+
// UnwrapMetadata extracts metadata from a structuredData record.
147+
func (o *openCDCUnwrapper) UnwrapMetadata(structData record.StructuredData) (record.Metadata, error) {
161148
var metadata record.Metadata
162149
meta, ok := structData["metadata"]
163150
if !ok {
164-
return record.Record{}, cerrors.Errorf("record payload after doesn't contain metadata")
151+
return metadata, cerrors.Errorf("record payload after doesn't contain metadata")
165152
}
166153

167154
switch m := meta.(type) {
@@ -173,12 +160,15 @@ func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
173160
metadata[k] = fmt.Sprint(v)
174161
}
175162
}
163+
return metadata, nil
164+
}
176165

177-
// get record.payload.after.key
166+
// UnwrapKey extracts key from a structuredData record.
167+
func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.Data, error) {
178168
var key record.Data
179169
ky, ok := structData["key"]
180170
if !ok {
181-
return record.Record{}, cerrors.Errorf("record payload after doesn't contain key")
171+
return key, cerrors.Errorf("record payload after doesn't contain key")
182172
}
183173

184174
switch k := ky.(type) {
@@ -188,11 +178,15 @@ func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
188178
key = record.RawData{Raw: []byte(k)}
189179
}
190180

191-
// get record.payload.after.payload.after
181+
return key, nil
182+
}
183+
184+
// UnwrapPayload extracts payload from a structuredData record.
185+
func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) {
192186
var payload record.Change
193187
pl, ok := structData["payload"]
194188
if !ok {
195-
return record.Record{}, cerrors.Errorf("record payload doesn't contain payload")
189+
return payload, cerrors.Errorf("record payload doesn't contain payload")
196190
}
197191

198192
switch p := pl.(type) {
@@ -201,12 +195,12 @@ func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
201195
case map[string]interface{}:
202196
afterData, ok := p["after"]
203197
if !ok {
204-
return record.Record{}, cerrors.Errorf("record payload after doesn't contain payload.after")
198+
return payload, cerrors.Errorf("record payload after doesn't contain payload.after")
205199
}
206200

207201
data, ok := afterData.(map[string]interface{})
208202
if !ok {
209-
return record.Record{}, cerrors.Errorf("record payload after payload.after is not a map")
203+
return payload, cerrors.Errorf("record payload after payload.after is not a map")
210204
}
211205

212206
convertedData := make(record.StructuredData, len(data))
@@ -219,6 +213,45 @@ func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
219213
After: convertedData,
220214
}
221215
}
216+
return payload, nil
217+
}
218+
219+
// Unwrap replaces the whole record.payload with record.payload.after.payload except position.
220+
func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
221+
var structData record.StructuredData
222+
data := rec.Payload.After
223+
switch d := data.(type) {
224+
case record.RawData:
225+
// unmarshal raw data to structured
226+
err := json.Unmarshal(data.Bytes(), &structData)
227+
if err != nil {
228+
return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err)
229+
}
230+
case record.StructuredData:
231+
structData = d
232+
default:
233+
return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
234+
}
235+
236+
operation, err := o.UnwrapOperation(structData)
237+
if err != nil {
238+
return record.Record{}, err
239+
}
240+
241+
metadata, err := o.UnwrapMetadata(structData)
242+
if err != nil {
243+
return record.Record{}, err
244+
}
245+
246+
key, err := o.UnwrapKey(structData)
247+
if err != nil {
248+
return record.Record{}, err
249+
}
250+
251+
payload, err := o.UnwrapPayload(structData)
252+
if err != nil {
253+
return record.Record{}, err
254+
}
222255

223256
return record.Record{
224257
Key: key,

0 commit comments

Comments
 (0)