Skip to content

Commit

Permalink
refactor postgres tojson
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 17, 2024
1 parent 8d25c74 commit cc30a3c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
20 changes: 16 additions & 4 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
for record := range req.Records.GetRecords() {
switch typedRecord := record.(type) {
case *model.InsertRecord:
itemsJSON, err := typedRecord.Items.ToJSONForPostgres()
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err)
}
Expand All @@ -300,11 +303,17 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSONForPostgres()
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err)
}
oldItemsJSON, err := typedRecord.OldItems.ToJSONForPostgres()
oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err)
}
Expand All @@ -321,7 +330,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSONForPostgres()
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err)
}
Expand Down
22 changes: 10 additions & 12 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (r *RecordItems) Len() int {
return len(r.Values)
}

func (r *RecordItems) toMap(isPostgres bool) (map[string]interface{}, error) {
func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) {
if r.ColToValIdx == nil {
return nil, errors.New("colToValIdx is nil")
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *RecordItems) toMap(isPostgres bool) (map[string]interface{}, error) {
return nil, fmt.Errorf("expected string value for hstore column %s for value %T", col, v.Value)
}

if isPostgres {
if !hstoreAsJSON {
jsonStruct[col] = hstoreVal
} else {
jsonVal, err := hstore_util.ParseHstore(hstoreVal)
Expand Down Expand Up @@ -250,22 +250,22 @@ func (r *RecordItems) toMap(isPostgres bool) (map[string]interface{}, error) {

type ToJSONOptions struct {
UnnestColumns map[string]struct{}
IsPostgres bool
HStoreAsJSON bool
}

func NewToJSONOptions(unnestCols []string, isPostgres bool) *ToJSONOptions {
func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) *ToJSONOptions {
unnestColumns := make(map[string]struct{}, len(unnestCols))
for _, col := range unnestCols {
unnestColumns[col] = struct{}{}
}
return &ToJSONOptions{
UnnestColumns: unnestColumns,
IsPostgres: isPostgres,
HStoreAsJSON: hstoreAsJSON,
}
}

func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) {
jsonStruct, err := r.toMap(opts.IsPostgres)
jsonStruct, err := r.toMap(opts.HStoreAsJSON)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -296,12 +296,10 @@ func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) {
return string(jsonBytes), nil
}

// this is introduced because we shouldn't do
// some transformations which we do in toMap(),
// for postgres SyncRecords
func (r *RecordItems) ToJSONForPostgres() (string, error) {
unnestCols := make([]string, 0)
return r.ToJSONWithOpts(NewToJSONOptions(unnestCols, true))
// a separate method like gives flexibility
// for us to handle some data types differently
func (r *RecordItems) ToJSONWithOptions(options *ToJSONOptions) (string, error) {
return r.ToJSONWithOpts(options)
}

func (r *RecordItems) ToJSON() (string, error) {
Expand Down

0 comments on commit cc30a3c

Please sign in to comment.