Skip to content

Commit

Permalink
Allow unnesting of columns for eventhub target
Browse files Browse the repository at this point in the history
Fixes: #442
  • Loading branch information
iskakaushik committed Sep 29, 2023
1 parent b7589fd commit 0dbce43
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 83 deletions.
4 changes: 3 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
Expand Down
163 changes: 87 additions & 76 deletions flow/generated/protos/peers.pb.go

Large diffs are not rendered by default.

56 changes: 52 additions & 4 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,11 @@ func (r *RecordItems) Len() int {
return len(r.values)
}

func (r *RecordItems) ToJSON() (string, error) {
func (r *RecordItems) toMap() (map[string]interface{}, error) {
if r.colToValIdx == nil {
return "", errors.New("colToValIdx is nil")
return nil, errors.New("colToValIdx is nil")
}

jsonStruct := make(map[string]interface{})
for col, idx := range r.colToValIdx {
v := r.values[idx]
Expand All @@ -129,18 +130,60 @@ func (r *RecordItems) ToJSON() (string, error) {
qvalue.QValueKindTime, qvalue.QValueKindTimeTZ:
jsonStruct[col], err = v.GoTimeConvert()
if err != nil {
return "", err
return nil, err
}
case qvalue.QValueKindNumeric:
bigRat, ok := v.Value.(*big.Rat)
if !ok {
return "", errors.New("expected *big.Rat value")
return nil, errors.New("expected *big.Rat value")
}
jsonStruct[col] = bigRat.FloatString(9)
default:
jsonStruct[col] = v.Value
}
}

return jsonStruct, nil
}

type ToJSONOptions struct {
UnnestColumns map[string]bool
}

func NewToJSONOptions(unnestCols []string) *ToJSONOptions {
unnestColumns := make(map[string]bool)
for _, col := range unnestCols {
unnestColumns[col] = true
}
return &ToJSONOptions{
UnnestColumns: unnestColumns,
}
}

func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) {
jsonStruct, err := r.toMap()
if err != nil {
return "", err
}

for col, idx := range r.colToValIdx {
v := r.values[idx]
if v.Kind == qvalue.QValueKindJSON {
if _, ok := opts.UnnestColumns[col]; ok {
var unnestStruct map[string]interface{}
err := json.Unmarshal([]byte(v.Value.(string)), &unnestStruct)
if err != nil {
return "", err
}

for k, v := range unnestStruct {
jsonStruct[k] = v
}
delete(jsonStruct, col)
}
}
}

jsonBytes, err := json.Marshal(jsonStruct)
if err != nil {
return "", err
Expand All @@ -149,6 +192,11 @@ func (r *RecordItems) ToJSON() (string, error) {
return string(jsonBytes), nil
}

func (r *RecordItems) ToJSON() (string, error) {
unnestCols := make([]string, 0)
return r.ToJSONWithOpts(NewToJSONOptions(unnestCols))
}

type InsertRecord struct {
// Name of the source table
SourceTableName string
Expand Down
19 changes: 18 additions & 1 deletion nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,25 @@ fn parse_db_options(
anyhow::bail!("metadata_db is required for eventhub group");
}

// split comma separated list of columns and trim
let unnest_columns = opts
.get("unnest_columns")
.map(|columns| {
columns
.split(',')
.map(|column| column.trim().to_string())
.collect::<Vec<_>>()
})
.unwrap_or_default();

let keys_to_ignore: HashSet<String> = vec!["metadata_db", "unnest_columns"]
.into_iter()
.map(|s| s.to_string())
.collect();

let mut eventhubs: HashMap<String, EventHubConfig> = HashMap::new();
for (key, _) in opts {
if key == "metadata_db" {
if keys_to_ignore.contains(&key) {
continue;
}

Expand All @@ -648,6 +664,7 @@ fn parse_db_options(
let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig {
eventhubs,
metadata_db,
unnest_columns,
};
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub struct EventHubGroupConfig {
pub eventhubs: ::std::collections::HashMap<::prost::alloc::string::String, EventHubConfig>,
#[prost(message, optional, tag="2")]
pub metadata_db: ::core::option::Option<PostgresConfig>,
#[prost(string, repeated, tag="3")]
pub unnest_columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_peers.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,19 @@ impl serde::Serialize for EventHubGroupConfig {
if self.metadata_db.is_some() {
len += 1;
}
if !self.unnest_columns.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_peers.EventHubGroupConfig", len)?;
if !self.eventhubs.is_empty() {
struct_ser.serialize_field("eventhubs", &self.eventhubs)?;
}
if let Some(v) = self.metadata_db.as_ref() {
struct_ser.serialize_field("metadataDb", v)?;
}
if !self.unnest_columns.is_empty() {
struct_ser.serialize_field("unnestColumns", &self.unnest_columns)?;
}
struct_ser.end()
}
}
Expand All @@ -566,12 +572,15 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
"eventhubs",
"metadata_db",
"metadataDb",
"unnest_columns",
"unnestColumns",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Eventhubs,
MetadataDb,
UnnestColumns,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -596,6 +605,7 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
match value {
"eventhubs" => Ok(GeneratedField::Eventhubs),
"metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb),
"unnestColumns" | "unnest_columns" => Ok(GeneratedField::UnnestColumns),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -617,6 +627,7 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
{
let mut eventhubs__ = None;
let mut metadata_db__ = None;
let mut unnest_columns__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Eventhubs => {
Expand All @@ -633,6 +644,12 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
}
metadata_db__ = map.next_value()?;
}
GeneratedField::UnnestColumns => {
if unnest_columns__.is_some() {
return Err(serde::de::Error::duplicate_field("unnestColumns"));
}
unnest_columns__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -641,6 +658,7 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
Ok(EventHubGroupConfig {
eventhubs: eventhubs__.unwrap_or_default(),
metadata_db: metadata_db__,
unnest_columns: unnest_columns__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/peers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ message EventHubGroupConfig {
// event hub peer name to event hub config
map<string, EventHubConfig> eventhubs = 1;
PostgresConfig metadata_db = 2;
repeated string unnest_columns = 3;
}

message S3Config {
Expand Down
18 changes: 17 additions & 1 deletion ui/grpc_generated/peers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export interface EventHubGroupConfig {
/** event hub peer name to event hub config */
eventhubs: { [key: string]: EventHubConfig };
metadataDb: PostgresConfig | undefined;
unnestColumns: string[];
}

export interface EventHubGroupConfig_EventhubsEntry {
Expand Down Expand Up @@ -949,7 +950,7 @@ export const EventHubConfig = {
};

function createBaseEventHubGroupConfig(): EventHubGroupConfig {
return { eventhubs: {}, metadataDb: undefined };
return { eventhubs: {}, metadataDb: undefined, unnestColumns: [] };
}

export const EventHubGroupConfig = {
Expand All @@ -960,6 +961,9 @@ export const EventHubGroupConfig = {
if (message.metadataDb !== undefined) {
PostgresConfig.encode(message.metadataDb, writer.uint32(18).fork()).ldelim();
}
for (const v of message.unnestColumns) {
writer.uint32(26).string(v!);
}
return writer;
},

Expand Down Expand Up @@ -987,6 +991,13 @@ export const EventHubGroupConfig = {

message.metadataDb = PostgresConfig.decode(reader, reader.uint32());
continue;
case 3:
if (tag !== 26) {
break;
}

message.unnestColumns.push(reader.string());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -1005,6 +1016,7 @@ export const EventHubGroupConfig = {
}, {})
: {},
metadataDb: isSet(object.metadataDb) ? PostgresConfig.fromJSON(object.metadataDb) : undefined,
unnestColumns: Array.isArray(object?.unnestColumns) ? object.unnestColumns.map((e: any) => String(e)) : [],
};
},

Expand All @@ -1022,6 +1034,9 @@ export const EventHubGroupConfig = {
if (message.metadataDb !== undefined) {
obj.metadataDb = PostgresConfig.toJSON(message.metadataDb);
}
if (message.unnestColumns?.length) {
obj.unnestColumns = message.unnestColumns;
}
return obj;
},

Expand All @@ -1042,6 +1057,7 @@ export const EventHubGroupConfig = {
message.metadataDb = (object.metadataDb !== undefined && object.metadataDb !== null)
? PostgresConfig.fromPartial(object.metadataDb)
: undefined;
message.unnestColumns = object.unnestColumns?.map((e) => e) || [];
return message;
},
};
Expand Down

0 comments on commit 0dbce43

Please sign in to comment.