Skip to content

Commit

Permalink
Allow unnesting of columns for eventhub target (#455)
Browse files Browse the repository at this point in the history
Fixes: #442
Fixes: #454
  • Loading branch information
iskakaushik authored Sep 29, 2023
1 parent b7589fd commit 7f238f2
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 98 deletions.
4 changes: 3 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE

_, err = hubClient.Get(ctx, resourceGroup, namespace, name.Eventhub, nil)

// TODO (kaushik): make these configurable.
partitionCount := int64(3)
retention := int64(1)
partitionCount := int64(cfg.PartitionCount)
retention := int64(cfg.MessageRetentionInDays)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
Expand Down
209 changes: 122 additions & 87 deletions flow/generated/protos/peers.pb.go

Large diffs are not rendered by default.

56 changes: 52 additions & 4 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,11 @@ func (r *RecordItems) Len() int {
return len(r.values)
}

func (r *RecordItems) ToJSON() (string, error) {
func (r *RecordItems) toMap() (map[string]interface{}, error) {
if r.colToValIdx == nil {
return "", errors.New("colToValIdx is nil")
return nil, errors.New("colToValIdx is nil")
}

jsonStruct := make(map[string]interface{})
for col, idx := range r.colToValIdx {
v := r.values[idx]
Expand All @@ -129,18 +130,60 @@ func (r *RecordItems) ToJSON() (string, error) {
qvalue.QValueKindTime, qvalue.QValueKindTimeTZ:
jsonStruct[col], err = v.GoTimeConvert()
if err != nil {
return "", err
return nil, err
}
case qvalue.QValueKindNumeric:
bigRat, ok := v.Value.(*big.Rat)
if !ok {
return "", errors.New("expected *big.Rat value")
return nil, errors.New("expected *big.Rat value")
}
jsonStruct[col] = bigRat.FloatString(9)
default:
jsonStruct[col] = v.Value
}
}

return jsonStruct, nil
}

type ToJSONOptions struct {
UnnestColumns map[string]bool
}

func NewToJSONOptions(unnestCols []string) *ToJSONOptions {
unnestColumns := make(map[string]bool)
for _, col := range unnestCols {
unnestColumns[col] = true
}
return &ToJSONOptions{
UnnestColumns: unnestColumns,
}
}

func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) {
jsonStruct, err := r.toMap()
if err != nil {
return "", err
}

for col, idx := range r.colToValIdx {
v := r.values[idx]
if v.Kind == qvalue.QValueKindJSON {
if _, ok := opts.UnnestColumns[col]; ok {
var unnestStruct map[string]interface{}
err := json.Unmarshal([]byte(v.Value.(string)), &unnestStruct)
if err != nil {
return "", err
}

for k, v := range unnestStruct {
jsonStruct[k] = v
}
delete(jsonStruct, col)
}
}
}

jsonBytes, err := json.Marshal(jsonStruct)
if err != nil {
return "", err
Expand All @@ -149,6 +192,11 @@ func (r *RecordItems) ToJSON() (string, error) {
return string(jsonBytes), nil
}

func (r *RecordItems) ToJSON() (string, error) {
unnestCols := make([]string, 0)
return r.ToJSONWithOpts(NewToJSONOptions(unnestCols))
}

type InsertRecord struct {
// Name of the source table
SourceTableName string
Expand Down
37 changes: 36 additions & 1 deletion nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,22 @@ fn parse_db_options(
.map(|s| s.to_string())
.unwrap_or_default();

// partition_count default to 3 if not set, parse as int
let partition_count = opts
.get("partition_count")
.map(|s| s.to_string())
.unwrap_or_else(|| "3".to_string())
.parse::<u32>()
.context("unable to parse partition_count as valid int")?;

// message_retention_in_days default to 7 if not set, parse as int
let message_retention_in_days = opts
.get("message_retention_in_days")
.map(|s| s.to_string())
.unwrap_or_else(|| "7".to_string())
.parse::<u32>()
.context("unable to parse message_retention_in_days as valid int")?;

let eventhub_config = EventHubConfig {
namespace: opts
.get("namespace")
Expand All @@ -578,6 +594,8 @@ fn parse_db_options(
.to_string(),
metadata_db,
subscription_id,
partition_count,
message_retention_in_days,
};
let config = Config::EventhubConfig(eventhub_config);
Some(config)
Expand Down Expand Up @@ -625,9 +643,25 @@ fn parse_db_options(
anyhow::bail!("metadata_db is required for eventhub group");
}

// split comma separated list of columns and trim
let unnest_columns = opts
.get("unnest_columns")
.map(|columns| {
columns
.split(',')
.map(|column| column.trim().to_string())
.collect::<Vec<_>>()
})
.unwrap_or_default();

let keys_to_ignore: HashSet<String> = vec!["metadata_db", "unnest_columns"]
.into_iter()
.map(|s| s.to_string())
.collect();

let mut eventhubs: HashMap<String, EventHubConfig> = HashMap::new();
for (key, _) in opts {
if key == "metadata_db" {
if keys_to_ignore.contains(&key) {
continue;
}

Expand All @@ -648,6 +682,7 @@ fn parse_db_options(
let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig {
eventhubs,
metadata_db,
unnest_columns,
};
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
Expand Down
8 changes: 8 additions & 0 deletions nexus/pt/src/peerdb_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ pub struct EventHubConfig {
/// if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable.
#[prost(string, tag="5")]
pub subscription_id: ::prost::alloc::string::String,
/// defaults to 3
#[prost(uint32, tag="6")]
pub partition_count: u32,
/// defaults to 7
#[prost(uint32, tag="7")]
pub message_retention_in_days: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand All @@ -101,6 +107,8 @@ pub struct EventHubGroupConfig {
pub eventhubs: ::std::collections::HashMap<::prost::alloc::string::String, EventHubConfig>,
#[prost(message, optional, tag="2")]
pub metadata_db: ::core::option::Option<PostgresConfig>,
#[prost(string, repeated, tag="3")]
pub unnest_columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
58 changes: 58 additions & 0 deletions nexus/pt/src/peerdb_peers.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ impl serde::Serialize for EventHubConfig {
if !self.subscription_id.is_empty() {
len += 1;
}
if self.partition_count != 0 {
len += 1;
}
if self.message_retention_in_days != 0 {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_peers.EventHubConfig", len)?;
if !self.namespace.is_empty() {
struct_ser.serialize_field("namespace", &self.namespace)?;
Expand All @@ -405,6 +411,12 @@ impl serde::Serialize for EventHubConfig {
if !self.subscription_id.is_empty() {
struct_ser.serialize_field("subscriptionId", &self.subscription_id)?;
}
if self.partition_count != 0 {
struct_ser.serialize_field("partitionCount", &self.partition_count)?;
}
if self.message_retention_in_days != 0 {
struct_ser.serialize_field("messageRetentionInDays", &self.message_retention_in_days)?;
}
struct_ser.end()
}
}
Expand All @@ -423,6 +435,10 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig {
"metadataDb",
"subscription_id",
"subscriptionId",
"partition_count",
"partitionCount",
"message_retention_in_days",
"messageRetentionInDays",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -432,6 +448,8 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig {
Location,
MetadataDb,
SubscriptionId,
PartitionCount,
MessageRetentionInDays,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -459,6 +477,8 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig {
"location" => Ok(GeneratedField::Location),
"metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb),
"subscriptionId" | "subscription_id" => Ok(GeneratedField::SubscriptionId),
"partitionCount" | "partition_count" => Ok(GeneratedField::PartitionCount),
"messageRetentionInDays" | "message_retention_in_days" => Ok(GeneratedField::MessageRetentionInDays),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -483,6 +503,8 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig {
let mut location__ = None;
let mut metadata_db__ = None;
let mut subscription_id__ = None;
let mut partition_count__ = None;
let mut message_retention_in_days__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Namespace => {
Expand Down Expand Up @@ -515,6 +537,22 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig {
}
subscription_id__ = Some(map.next_value()?);
}
GeneratedField::PartitionCount => {
if partition_count__.is_some() {
return Err(serde::de::Error::duplicate_field("partitionCount"));
}
partition_count__ =
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::MessageRetentionInDays => {
if message_retention_in_days__.is_some() {
return Err(serde::de::Error::duplicate_field("messageRetentionInDays"));
}
message_retention_in_days__ =
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -526,6 +564,8 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig {
location: location__.unwrap_or_default(),
metadata_db: metadata_db__,
subscription_id: subscription_id__.unwrap_or_default(),
partition_count: partition_count__.unwrap_or_default(),
message_retention_in_days: message_retention_in_days__.unwrap_or_default(),
})
}
}
Expand All @@ -546,13 +586,19 @@ impl serde::Serialize for EventHubGroupConfig {
if self.metadata_db.is_some() {
len += 1;
}
if !self.unnest_columns.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_peers.EventHubGroupConfig", len)?;
if !self.eventhubs.is_empty() {
struct_ser.serialize_field("eventhubs", &self.eventhubs)?;
}
if let Some(v) = self.metadata_db.as_ref() {
struct_ser.serialize_field("metadataDb", v)?;
}
if !self.unnest_columns.is_empty() {
struct_ser.serialize_field("unnestColumns", &self.unnest_columns)?;
}
struct_ser.end()
}
}
Expand All @@ -566,12 +612,15 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
"eventhubs",
"metadata_db",
"metadataDb",
"unnest_columns",
"unnestColumns",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Eventhubs,
MetadataDb,
UnnestColumns,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -596,6 +645,7 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
match value {
"eventhubs" => Ok(GeneratedField::Eventhubs),
"metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb),
"unnestColumns" | "unnest_columns" => Ok(GeneratedField::UnnestColumns),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -617,6 +667,7 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
{
let mut eventhubs__ = None;
let mut metadata_db__ = None;
let mut unnest_columns__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Eventhubs => {
Expand All @@ -633,6 +684,12 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
}
metadata_db__ = map.next_value()?;
}
GeneratedField::UnnestColumns => {
if unnest_columns__.is_some() {
return Err(serde::de::Error::duplicate_field("unnestColumns"));
}
unnest_columns__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -641,6 +698,7 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig {
Ok(EventHubGroupConfig {
eventhubs: eventhubs__.unwrap_or_default(),
metadata_db: metadata_db__,
unnest_columns: unnest_columns__.unwrap_or_default(),
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions protos/peers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,17 @@ message EventHubConfig {
PostgresConfig metadata_db = 4;
// if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable.
string subscription_id = 5;
// defaults to 3
uint32 partition_count = 6;
// defaults to 7
uint32 message_retention_in_days = 7;
}

message EventHubGroupConfig {
// event hub peer name to event hub config
map<string, EventHubConfig> eventhubs = 1;
PostgresConfig metadata_db = 2;
repeated string unnest_columns = 3;
}

message S3Config {
Expand Down
Loading

0 comments on commit 7f238f2

Please sign in to comment.