diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 9810fe6672..a3b7dc8180 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -3,6 +3,7 @@ package connectors import ( "context" "errors" + "fmt" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" @@ -141,7 +142,9 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig()) case *protos.Peer_EventhubConfig: - return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) + return nil, fmt.Errorf("use eventhub group config instead") + case *protos.Peer_EventhubGroupConfig: + return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 158323e1a0..e1b33f04c4 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -11,7 +11,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" - "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -23,7 +22,7 @@ import ( type EventHubConnector struct { ctx context.Context - config *protos.EventHubConfig + config *protos.EventHubGroupConfig pgMetadata *PostgresMetadataStore tableSchemas map[string]*protos.TableSchema creds *azidentity.DefaultAzureCredential @@ -33,7 +32,7 @@ type EventHubConnector struct { // NewEventHubConnector creates a new EventHubConnector. func NewEventHubConnector( ctx context.Context, - config *protos.EventHubConfig, + config *protos.EventHubGroupConfig, ) (*EventHubConnector, error) { defaultAzureCreds, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { @@ -41,8 +40,7 @@ func NewEventHubConnector( return nil, err } - hubManager := NewEventHubManager(ctx, defaultAzureCreds, config.GetNamespace()) - + hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) if err != nil { log.Errorf("failed to create postgres metadata store: %v", err) @@ -255,7 +253,7 @@ func (c *EventHubConnector) sendEventBatch( wg.Done() }() - hub, err := c.hubManager.GetOrCreateHub(tblName) + hub, err := c.hubManager.GetOrCreateHubClient(tblName) if err != nil { once.Do(func() { firstErr = err }) return @@ -301,74 +299,33 @@ func (c *EventHubConnector) sendEventBatch( func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table - // key is the source table and value is the destination topic name. + // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() for _, table := range tableMap { - err := c.ensureEventHub(c.ctx, table, req.FlowJobName) + // parse peer name and topic name. + peerName, topicName, err := parsePeerAndTopicName(table) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, "table": table, - }).Errorf("failed to get event hub properties: %v", err) + }).Errorf("failed to parse peer and topic name: %v", err) return nil, err } - } - return nil, nil -} - -func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error { - hubClient, err := c.getEventHubMgmtClient() - if err != nil { - return err - } - - namespace := c.config.GetNamespace() - resourceGroup := c.config.GetResourceGroup() - _, err = hubClient.Get(ctx, resourceGroup, namespace, name, nil) - - // TODO (kaushik): make these configurable. - partitionCount := int64(3) - retention := int64(1) - if err != nil { - opts := armeventhub.Eventhub{ - Properties: &armeventhub.Properties{ - PartitionCount: &partitionCount, - MessageRetentionInDays: &retention, - }, - } - - _, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name, opts, nil) + err = c.hubManager.EnsureEventHubExists(c.ctx, peerName, topicName) if err != nil { - log.Errorf("failed to create event hub: %v", err) - return err + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + "table": table, + }).Errorf("failed to ensure event hub exists: %v", err) + return nil, err } - - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("event hub %s created", name) - } else { - log.Infof("event hub %s already exists", name) - } - - return nil -} - -func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClient, error) { - subID, err := utils.GetAzureSubscriptionID() - if err != nil { - log.Errorf("failed to get azure subscription id: %v", err) - return nil, err } - hubClient, err := armeventhub.NewEventHubsClient(subID, c.creds, nil) - if err != nil { - log.Errorf("failed to get event hub client: %v", err) - return nil, err - } - - return hubClient, nil + return &protos.CreateRawTableOutput{ + TableIdentifier: "n/a", + }, nil } func (c *EventHubConnector) SetupNormalizedTables( @@ -380,6 +337,14 @@ func (c *EventHubConnector) SetupNormalizedTables( }, nil } +func parsePeerAndTopicName(s string) (string, string, error) { + parts := strings.Split(s, ".") + if len(parts) != 2 { + return "", "", fmt.Errorf("invalid peer and topic name %s", s) + } + return parts[0], parts[1], nil +} + func eventDataFromString(s string) *azeventhubs.EventData { return &azeventhubs.EventData{ Body: []byte(s), diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 96117ceeb7..9d75db7dd3 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,40 +7,62 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" cmap "github.com/orcaman/concurrent-map/v2" + log "github.com/sirupsen/logrus" ) type EventHubManager struct { - ctx context.Context - creds *azidentity.DefaultAzureCredential - namespace string - hubs cmap.ConcurrentMap[string, *azeventhubs.ProducerClient] + ctx context.Context + creds *azidentity.DefaultAzureCredential + // eventhub peer name -> config + peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig] + // eventhub name -> client + hubs cmap.ConcurrentMap[string, *azeventhubs.ProducerClient] + // eventhub name -> peer name + peerNames cmap.ConcurrentMap[string, string] } func NewEventHubManager( ctx context.Context, creds *azidentity.DefaultAzureCredential, - namespace string, + groupConfig *protos.EventHubGroupConfig, ) *EventHubManager { hubs := cmap.New[*azeventhubs.ProducerClient]() + peerConfig := cmap.New[*protos.EventHubConfig]() + + for name, config := range groupConfig.Eventhubs { + peerConfig.Set(name, config) + } + return &EventHubManager{ - ctx: ctx, - creds: creds, - namespace: namespace, - hubs: hubs, + ctx: ctx, + creds: creds, + hubs: hubs, } } -func (m *EventHubManager) GetOrCreateHub(name string) (*azeventhubs.ProducerClient, error) { - hub, ok := m.hubs.Get(name) +func (m *EventHubManager) GetOrCreateHubClient(name string) (*azeventhubs.ProducerClient, error) { + peerName, ok := m.peerNames.Get(name) + if !ok { + return nil, fmt.Errorf("eventhub '%s' has not configured", name) + } - namespace := m.namespace + ehConfig, ok := m.peerConfig.Get(peerName) + if !ok { + return nil, fmt.Errorf("eventhub '%s' not configured", name) + } + + namespace := ehConfig.Namespace // if the namespace isn't fully qualified, add the `.servicebus.windows.net` // check by counting the number of '.' in the namespace if strings.Count(namespace, ".") < 2 { namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace) } + hub, ok := m.hubs.Get(name) if !ok { opts := &azeventhubs.ProducerClientOptions{} hub, err := azeventhubs.NewProducerClient(namespace, name, m.creds, opts) @@ -65,7 +87,7 @@ func (m *EventHubManager) Close() error { } func (m *EventHubManager) CreateEventDataBatch(name string) (*azeventhubs.EventDataBatch, error) { - hub, err := m.GetOrCreateHub(name) + hub, err := m.GetOrCreateHubClient(name) if err != nil { return nil, err } @@ -78,3 +100,67 @@ func (m *EventHubManager) CreateEventDataBatch(name string) (*azeventhubs.EventD return batch, nil } + +// EnsureEventHubExists ensures that the eventhub exists. +func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, peerName string, eventhub string) error { + cfg, ok := m.peerConfig.Get(peerName) + if !ok { + return fmt.Errorf("eventhub peer '%s' not configured", peerName) + } + + hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId) + if err != nil { + return fmt.Errorf("failed to get event hub client: %v", err) + } + + namespace := cfg.Namespace + resourceGroup := cfg.ResourceGroup + + name := eventhub + _, err = hubClient.Get(ctx, resourceGroup, namespace, name, nil) + + m.peerNames.Set(name, peerName) + + // TODO (kaushik): make these configurable. + partitionCount := int64(3) + retention := int64(1) + if err != nil { + opts := armeventhub.Eventhub{ + Properties: &armeventhub.Properties{ + PartitionCount: &partitionCount, + MessageRetentionInDays: &retention, + }, + } + + _, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name, opts, nil) + if err != nil { + log.Errorf("failed to create event hub: %v", err) + return err + } + + log.Infof("event hub %s created", name) + } else { + log.Infof("event hub %s already exists", name) + } + + return nil +} + +func (m *EventHubManager) getEventHubMgmtClient(subID string) (*armeventhub.EventHubsClient, error) { + if subID == "" { + envSubID, err := utils.GetAzureSubscriptionID() + if err != nil { + log.Errorf("failed to get azure subscription id: %v", err) + return nil, err + } + subID = envSubID + } + + hubClient, err := armeventhub.NewEventHubsClient(subID, m.creds, nil) + if err != nil { + log.Errorf("failed to get event hub client: %v", err) + return nil, err + } + + return hubClient, nil +} diff --git a/flow/generated/protos/peers.pb.go b/flow/generated/protos/peers.pb.go index 41a872107f..5483970e4a 100644 --- a/flow/generated/protos/peers.pb.go +++ b/flow/generated/protos/peers.pb.go @@ -23,13 +23,14 @@ const ( type DBType int32 const ( - DBType_BIGQUERY DBType = 0 - DBType_SNOWFLAKE DBType = 1 - DBType_MONGO DBType = 2 - DBType_POSTGRES DBType = 3 - DBType_EVENTHUB DBType = 4 - DBType_S3 DBType = 5 - DBType_SQLSERVER DBType = 6 + DBType_BIGQUERY DBType = 0 + DBType_SNOWFLAKE DBType = 1 + DBType_MONGO DBType = 2 + DBType_POSTGRES DBType = 3 + DBType_EVENTHUB DBType = 4 + DBType_S3 DBType = 5 + DBType_SQLSERVER DBType = 6 + DBType_EVENTHUB_GROUP DBType = 7 ) // Enum value maps for DBType. @@ -42,15 +43,17 @@ var ( 4: "EVENTHUB", 5: "S3", 6: "SQLSERVER", + 7: "EVENTHUB_GROUP", } DBType_value = map[string]int32{ - "BIGQUERY": 0, - "SNOWFLAKE": 1, - "MONGO": 2, - "POSTGRES": 3, - "EVENTHUB": 4, - "S3": 5, - "SQLSERVER": 6, + "BIGQUERY": 0, + "SNOWFLAKE": 1, + "MONGO": 2, + "POSTGRES": 3, + "EVENTHUB": 4, + "S3": 5, + "SQLSERVER": 6, + "EVENTHUB_GROUP": 7, } ) @@ -495,6 +498,8 @@ type EventHubConfig struct { ResourceGroup string `protobuf:"bytes,2,opt,name=resource_group,json=resourceGroup,proto3" json:"resource_group,omitempty"` Location string `protobuf:"bytes,3,opt,name=location,proto3" json:"location,omitempty"` MetadataDb *PostgresConfig `protobuf:"bytes,4,opt,name=metadata_db,json=metadataDb,proto3" json:"metadata_db,omitempty"` + // if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable. + SubscriptionId string `protobuf:"bytes,5,opt,name=subscription_id,json=subscriptionId,proto3" json:"subscription_id,omitempty"` } func (x *EventHubConfig) Reset() { @@ -557,6 +562,69 @@ func (x *EventHubConfig) GetMetadataDb() *PostgresConfig { return nil } +func (x *EventHubConfig) GetSubscriptionId() string { + if x != nil { + return x.SubscriptionId + } + return "" +} + +type EventHubGroupConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // event hub peer name to event hub config + Eventhubs map[string]*EventHubConfig `protobuf:"bytes,1,rep,name=eventhubs,proto3" json:"eventhubs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + MetadataDb *PostgresConfig `protobuf:"bytes,2,opt,name=metadata_db,json=metadataDb,proto3" json:"metadata_db,omitempty"` +} + +func (x *EventHubGroupConfig) Reset() { + *x = EventHubGroupConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_peers_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EventHubGroupConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EventHubGroupConfig) ProtoMessage() {} + +func (x *EventHubGroupConfig) ProtoReflect() protoreflect.Message { + mi := &file_peers_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EventHubGroupConfig.ProtoReflect.Descriptor instead. +func (*EventHubGroupConfig) Descriptor() ([]byte, []int) { + return file_peers_proto_rawDescGZIP(), []int{5} +} + +func (x *EventHubGroupConfig) GetEventhubs() map[string]*EventHubConfig { + if x != nil { + return x.Eventhubs + } + return nil +} + +func (x *EventHubGroupConfig) GetMetadataDb() *PostgresConfig { + if x != nil { + return x.MetadataDb + } + return nil +} + type S3Config struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -568,7 +636,7 @@ type S3Config struct { func (x *S3Config) Reset() { *x = S3Config{} if protoimpl.UnsafeEnabled { - mi := &file_peers_proto_msgTypes[5] + mi := &file_peers_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -581,7 +649,7 @@ func (x *S3Config) String() string { func (*S3Config) ProtoMessage() {} func (x *S3Config) ProtoReflect() protoreflect.Message { - mi := &file_peers_proto_msgTypes[5] + mi := &file_peers_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -594,7 +662,7 @@ func (x *S3Config) ProtoReflect() protoreflect.Message { // Deprecated: Use S3Config.ProtoReflect.Descriptor instead. func (*S3Config) Descriptor() ([]byte, []int) { - return file_peers_proto_rawDescGZIP(), []int{5} + return file_peers_proto_rawDescGZIP(), []int{6} } func (x *S3Config) GetUrl() string { @@ -619,7 +687,7 @@ type SqlServerConfig struct { func (x *SqlServerConfig) Reset() { *x = SqlServerConfig{} if protoimpl.UnsafeEnabled { - mi := &file_peers_proto_msgTypes[6] + mi := &file_peers_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -632,7 +700,7 @@ func (x *SqlServerConfig) String() string { func (*SqlServerConfig) ProtoMessage() {} func (x *SqlServerConfig) ProtoReflect() protoreflect.Message { - mi := &file_peers_proto_msgTypes[6] + mi := &file_peers_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -645,7 +713,7 @@ func (x *SqlServerConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use SqlServerConfig.ProtoReflect.Descriptor instead. func (*SqlServerConfig) Descriptor() ([]byte, []int) { - return file_peers_proto_rawDescGZIP(), []int{6} + return file_peers_proto_rawDescGZIP(), []int{7} } func (x *SqlServerConfig) GetServer() string { @@ -699,13 +767,14 @@ type Peer struct { // *Peer_EventhubConfig // *Peer_S3Config // *Peer_SqlserverConfig + // *Peer_EventhubGroupConfig Config isPeer_Config `protobuf_oneof:"config"` } func (x *Peer) Reset() { *x = Peer{} if protoimpl.UnsafeEnabled { - mi := &file_peers_proto_msgTypes[7] + mi := &file_peers_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -718,7 +787,7 @@ func (x *Peer) String() string { func (*Peer) ProtoMessage() {} func (x *Peer) ProtoReflect() protoreflect.Message { - mi := &file_peers_proto_msgTypes[7] + mi := &file_peers_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -731,7 +800,7 @@ func (x *Peer) ProtoReflect() protoreflect.Message { // Deprecated: Use Peer.ProtoReflect.Descriptor instead. func (*Peer) Descriptor() ([]byte, []int) { - return file_peers_proto_rawDescGZIP(), []int{7} + return file_peers_proto_rawDescGZIP(), []int{8} } func (x *Peer) GetName() string { @@ -804,6 +873,13 @@ func (x *Peer) GetSqlserverConfig() *SqlServerConfig { return nil } +func (x *Peer) GetEventhubGroupConfig() *EventHubGroupConfig { + if x, ok := x.GetConfig().(*Peer_EventhubGroupConfig); ok { + return x.EventhubGroupConfig + } + return nil +} + type isPeer_Config interface { isPeer_Config() } @@ -836,6 +912,10 @@ type Peer_SqlserverConfig struct { SqlserverConfig *SqlServerConfig `protobuf:"bytes,9,opt,name=sqlserver_config,json=sqlserverConfig,proto3,oneof"` } +type Peer_EventhubGroupConfig struct { + EventhubGroupConfig *EventHubGroupConfig `protobuf:"bytes,10,opt,name=eventhub_group_config,json=eventhubGroupConfig,proto3,oneof"` +} + func (*Peer_SnowflakeConfig) isPeer_Config() {} func (*Peer_BigqueryConfig) isPeer_Config() {} @@ -850,6 +930,8 @@ func (*Peer_S3Config) isPeer_Config() {} func (*Peer_SqlserverConfig) isPeer_Config() {} +func (*Peer_EventhubGroupConfig) isPeer_Config() {} + var File_peers_proto protoreflect.FileDescriptor var file_peers_proto_rawDesc = []byte{ @@ -922,7 +1004,7 @@ var file_peers_proto_rawDesc = []byte{ 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x22, - 0xb0, 0x01, 0x0a, 0x0e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, + 0xd9, 0x01, 0x0a, 0x0e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x67, 0x72, 0x6f, @@ -933,68 +1015,93 @@ var file_peers_proto_rawDesc = []byte{ 0x64, 0x62, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x44, 0x62, 0x22, 0x1c, 0x0a, 0x08, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, - 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, - 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, - 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, - 0x12, 0x12, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x75, 0x73, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, - 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0xb8, 0x04, 0x0a, - 0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, - 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, - 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, - 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, - 0x47, 0x0a, 0x0f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, - 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, - 0x6f, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, - 0x6e, 0x67, 0x6f, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, - 0x67, 0x6f, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, - 0x67, 0x72, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, - 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, - 0x00, 0x52, 0x0e, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, - 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x68, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, - 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, - 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, - 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x63, 0x0a, 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x00, 0x12, - 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, 0x4b, 0x45, 0x10, 0x01, 0x12, 0x09, - 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x4f, 0x53, - 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x56, 0x45, 0x4e, 0x54, - 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, 0x33, 0x10, 0x05, 0x12, 0x0d, 0x0a, - 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, 0x10, 0x06, 0x42, 0x7c, 0x0a, 0x10, - 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, - 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, - 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, - 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, - 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, - 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x44, 0x62, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x80, 0x02, 0x0a, 0x13, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x4e, 0x0a, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x47, 0x72, + 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x68, + 0x75, 0x62, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, + 0x75, 0x62, 0x73, 0x12, 0x3d, 0x0a, 0x0b, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, + 0x64, 0x62, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x44, 0x62, 0x1a, 0x5a, 0x0a, 0x0e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x32, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1c, + 0x0a, 0x08, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, + 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x89, 0x01, 0x0a, + 0x0f, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x75, 0x73, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, + 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x1a, 0x0a, 0x08, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x91, 0x05, 0x0a, 0x04, 0x50, 0x65, 0x65, + 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, + 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, + 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, 0x77, + 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x62, + 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, + 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, + 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, + 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x70, + 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, + 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, 0x0a, + 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, + 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x77, 0x0a, 0x06, + 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, + 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, 0x4b, + 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, 0x0c, + 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, + 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, 0x33, + 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, 0x10, + 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, 0x52, + 0x4f, 0x55, 0x50, 0x10, 0x07, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, + 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, + 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, 0x0b, + 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, + 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1010,33 +1117,39 @@ func file_peers_proto_rawDescGZIP() []byte { } var file_peers_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_peers_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_peers_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_peers_proto_goTypes = []interface{}{ - (DBType)(0), // 0: peerdb_peers.DBType - (*SnowflakeConfig)(nil), // 1: peerdb_peers.SnowflakeConfig - (*BigqueryConfig)(nil), // 2: peerdb_peers.BigqueryConfig - (*MongoConfig)(nil), // 3: peerdb_peers.MongoConfig - (*PostgresConfig)(nil), // 4: peerdb_peers.PostgresConfig - (*EventHubConfig)(nil), // 5: peerdb_peers.EventHubConfig - (*S3Config)(nil), // 6: peerdb_peers.S3Config - (*SqlServerConfig)(nil), // 7: peerdb_peers.SqlServerConfig - (*Peer)(nil), // 8: peerdb_peers.Peer + (DBType)(0), // 0: peerdb_peers.DBType + (*SnowflakeConfig)(nil), // 1: peerdb_peers.SnowflakeConfig + (*BigqueryConfig)(nil), // 2: peerdb_peers.BigqueryConfig + (*MongoConfig)(nil), // 3: peerdb_peers.MongoConfig + (*PostgresConfig)(nil), // 4: peerdb_peers.PostgresConfig + (*EventHubConfig)(nil), // 5: peerdb_peers.EventHubConfig + (*EventHubGroupConfig)(nil), // 6: peerdb_peers.EventHubGroupConfig + (*S3Config)(nil), // 7: peerdb_peers.S3Config + (*SqlServerConfig)(nil), // 8: peerdb_peers.SqlServerConfig + (*Peer)(nil), // 9: peerdb_peers.Peer + nil, // 10: peerdb_peers.EventHubGroupConfig.EventhubsEntry } var file_peers_proto_depIdxs = []int32{ - 4, // 0: peerdb_peers.EventHubConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig - 0, // 1: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType - 1, // 2: peerdb_peers.Peer.snowflake_config:type_name -> peerdb_peers.SnowflakeConfig - 2, // 3: peerdb_peers.Peer.bigquery_config:type_name -> peerdb_peers.BigqueryConfig - 3, // 4: peerdb_peers.Peer.mongo_config:type_name -> peerdb_peers.MongoConfig - 4, // 5: peerdb_peers.Peer.postgres_config:type_name -> peerdb_peers.PostgresConfig - 5, // 6: peerdb_peers.Peer.eventhub_config:type_name -> peerdb_peers.EventHubConfig - 6, // 7: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config - 7, // 8: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig - 9, // [9:9] is the sub-list for method output_type - 9, // [9:9] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 4, // 0: peerdb_peers.EventHubConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig + 10, // 1: peerdb_peers.EventHubGroupConfig.eventhubs:type_name -> peerdb_peers.EventHubGroupConfig.EventhubsEntry + 4, // 2: peerdb_peers.EventHubGroupConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig + 0, // 3: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType + 1, // 4: peerdb_peers.Peer.snowflake_config:type_name -> peerdb_peers.SnowflakeConfig + 2, // 5: peerdb_peers.Peer.bigquery_config:type_name -> peerdb_peers.BigqueryConfig + 3, // 6: peerdb_peers.Peer.mongo_config:type_name -> peerdb_peers.MongoConfig + 4, // 7: peerdb_peers.Peer.postgres_config:type_name -> peerdb_peers.PostgresConfig + 5, // 8: peerdb_peers.Peer.eventhub_config:type_name -> peerdb_peers.EventHubConfig + 7, // 9: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config + 8, // 10: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig + 6, // 11: peerdb_peers.Peer.eventhub_group_config:type_name -> peerdb_peers.EventHubGroupConfig + 5, // 12: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig + 13, // [13:13] is the sub-list for method output_type + 13, // [13:13] is the sub-list for method input_type + 13, // [13:13] is the sub-list for extension type_name + 13, // [13:13] is the sub-list for extension extendee + 0, // [0:13] is the sub-list for field type_name } func init() { file_peers_proto_init() } @@ -1106,7 +1219,7 @@ func file_peers_proto_init() { } } file_peers_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*S3Config); i { + switch v := v.(*EventHubGroupConfig); i { case 0: return &v.state case 1: @@ -1118,7 +1231,7 @@ func file_peers_proto_init() { } } file_peers_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SqlServerConfig); i { + switch v := v.(*S3Config); i { case 0: return &v.state case 1: @@ -1130,6 +1243,18 @@ func file_peers_proto_init() { } } file_peers_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SqlServerConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_peers_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Peer); i { case 0: return &v.state @@ -1143,7 +1268,7 @@ func file_peers_proto_init() { } } file_peers_proto_msgTypes[0].OneofWrappers = []interface{}{} - file_peers_proto_msgTypes[7].OneofWrappers = []interface{}{ + file_peers_proto_msgTypes[8].OneofWrappers = []interface{}{ (*Peer_SnowflakeConfig)(nil), (*Peer_BigqueryConfig)(nil), (*Peer_MongoConfig)(nil), @@ -1151,6 +1276,7 @@ func file_peers_proto_init() { (*Peer_EventhubConfig)(nil), (*Peer_S3Config)(nil), (*Peer_SqlserverConfig)(nil), + (*Peer_EventhubGroupConfig)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1158,7 +1284,7 @@ func file_peers_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_peers_proto_rawDesc, NumEnums: 1, - NumMessages: 8, + NumMessages: 10, NumExtensions: 0, NumServices: 0, }, diff --git a/flow/go.mod b/flow/go.mod index de82d7c8e0..86a9303cc5 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -10,6 +10,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.1.1 github.com/aws/aws-sdk-go v1.45.15 + github.com/cenkalti/backoff/v4 v4.2.1 github.com/google/uuid v1.3.1 github.com/hashicorp/go-multierror v1.1.1 github.com/jackc/pglogrepl v0.0.0-20230826184802-9ed16cb201f6 @@ -36,7 +37,6 @@ require ( ) require ( - github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.4 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index f9bf5d4853..c734343a62 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -951,7 +951,6 @@ github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpx github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -972,7 +971,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg= github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU= -github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= @@ -1152,7 +1150,6 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -1212,7 +1209,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index e1b3969981..6dd39e0ba2 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -88,8 +88,15 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { /// PeerDDLAnalyzer is a statement analyzer that checks if the given /// statement is a PeerDB DDL statement. If it is, it returns the type of /// DDL statement. -#[derive(Default)] -pub struct PeerDDLAnalyzer; +pub struct PeerDDLAnalyzer<'a> { + peers: &'a HashMap, +} + +impl<'a> PeerDDLAnalyzer<'a> { + pub fn new(peers: &'a HashMap) -> Self { + Self { peers } + } +} #[derive(Debug, Clone)] pub enum PeerDDL { @@ -114,7 +121,7 @@ pub enum PeerDDL { }, } -impl StatementAnalyzer for PeerDDLAnalyzer { +impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { type Output = Option; fn analyze(&self, statement: &Statement) -> anyhow::Result { @@ -126,7 +133,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { with_options, } => { let db_type = DbType::from(peer_type.clone()); - let config = parse_db_options(db_type, with_options.clone())?; + let config = parse_db_options(&self.peers, db_type, with_options.clone())?; let peer = Peer { name: peer_name.to_string().to_lowercase(), r#type: db_type as i32, @@ -395,6 +402,7 @@ impl StatementAnalyzer for PeerCursorAnalyzer { } fn parse_db_options( + peers: &HashMap, db_type: DbType, with_options: Vec, ) -> anyhow::Result> { @@ -545,33 +553,15 @@ fn parse_db_options( Some(config) } DbType::Eventhub => { - let mut metadata_db = PostgresConfig::default(); let conn_str = opts .get("metadata_db") .context("no metadata db specified")?; - let param_pairs: Vec<&str> = conn_str.split_whitespace().collect(); - match param_pairs.len() { - 5 => Ok(true), - _ => Err(anyhow::Error::msg("Invalid connection string. Check formatting and if the required parameters have been specified.")), - }?; - for pair in param_pairs { - let key_value: Vec<&str> = pair.trim().split('=').collect(); - match key_value.len() { - 2 => Ok(true), - _ => Err(anyhow::Error::msg( - "Invalid config setting for PG. Check the formatting", - )), - }?; - let value = key_value[1].to_string(); - match key_value[0] { - "host" => metadata_db.host = value, - "port" => metadata_db.port = value.parse().context("Invalid PG Port")?, - "database" => metadata_db.database = value, - "user" => metadata_db.user = value, - "password" => metadata_db.password = value, - _ => (), - }; - } + let metadata_db = parse_metadata_db_info(conn_str)?; + let subscription_id = opts + .get("subscription_id") + .map(|s| s.to_string()) + .unwrap_or_default(); + let eventhub_config = EventHubConfig { namespace: opts .get("namespace") @@ -586,6 +576,7 @@ fn parse_db_options( .context("location not specified")? .to_string(), metadata_db: Some(metadata_db), + subscription_id, }; let config = Config::EventhubConfig(eventhub_config); Some(config) @@ -622,7 +613,70 @@ fn parse_db_options( let config = Config::SqlserverConfig(sqlserver_config); Some(config) } + DbType::EventhubGroup => { + let conn_str = opts + .get("metadata_db") + .context("no metadata db specified")?; + let metadata_db = parse_metadata_db_info(conn_str)?; + + let mut eventhubs: HashMap = HashMap::new(); + for (key, _) in opts { + if key == "metadata_db" { + continue; + } + + // check if peers contains key and if it does + // then add it to the eventhubs hashmap, if not error + if let Some(peer) = peers.get(&key) { + let eventhub_config = peer.config.clone().unwrap(); + if let Config::EventhubConfig(eventhub_config) = eventhub_config { + eventhubs.insert(key.to_string(), eventhub_config); + } else { + anyhow::bail!("Peer '{}' is not an eventhub", key); + } + } else { + anyhow::bail!("Peer '{}' does not exist", key); + } + } + + let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig { + eventhubs, + metadata_db: Some(metadata_db), + }; + let config = Config::EventhubGroupConfig(eventhub_group_config); + Some(config) + } }; Ok(config) } + +fn parse_metadata_db_info(conn_str: &str) -> anyhow::Result { + let mut metadata_db = PostgresConfig::default(); + let param_pairs: Vec<&str> = conn_str.split_whitespace().collect(); + match param_pairs.len() { + 5 => Ok(true), + _ => Err(anyhow::Error::msg("Invalid connection string. Check formatting and if the required parameters have been specified.")), + }?; + + for pair in param_pairs { + let key_value: Vec<&str> = pair.trim().split('=').collect(); + match key_value.len() { + 2 => Ok(true), + _ => Err(anyhow::Error::msg( + "Invalid config setting for PG. Check the formatting", + )), + }?; + let value = key_value[1].to_string(); + match key_value[0] { + "host" => metadata_db.host = value, + "port" => metadata_db.port = value.parse().context("Invalid PG Port")?, + "database" => metadata_db.database = value, + "user" => metadata_db.user = value, + "password" => metadata_db.password = value, + _ => (), + }; + } + + Ok(metadata_db) +} diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index bab49b945c..dbce51dfaf 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -143,6 +143,11 @@ impl Catalog { buf.reserve(config_len); sqlserver_config.encode(&mut buf)?; } + Config::EventhubGroupConfig(eventhub_group_config) => { + let config_len = eventhub_group_config.encoded_len(); + buf.reserve(config_len); + eventhub_group_config.encode(&mut buf)?; + } }; buf @@ -331,6 +336,16 @@ impl Catalog { pt::peerdb_peers::SqlServerConfig::decode(options.as_slice()).context(err)?; Ok(Some(Config::SqlserverConfig(sqlserver_config))) } + Some(DbType::EventhubGroup) => { + let err = format!( + "unable to decode {} options for peer {}", + "eventhub_group", name + ); + let eventhub_group_config = + pt::peerdb_peers::EventHubGroupConfig::decode(options.as_slice()) + .context(err)?; + Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) + } None => Ok(None), } } diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index 1233ab8bfc..18716b66e9 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -42,7 +42,7 @@ impl NexusStatement { stmt: &Statement, ) -> PgWireResult { let ddl = { - let pdl: PeerDDLAnalyzer = Default::default(); + let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer::new(&peers); pdl.analyze(stmt).map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), diff --git a/nexus/pt/src/lib.rs b/nexus/pt/src/lib.rs index 87ca1d699d..eea59cfe72 100644 --- a/nexus/pt/src/lib.rs +++ b/nexus/pt/src/lib.rs @@ -16,6 +16,7 @@ impl From for DbType { PeerType::EventHub => DbType::Eventhub, PeerType::S3 => DbType::S3, PeerType::SQLServer => DbType::Sqlserver, + PeerType::EventHubGroup => DbType::EventhubGroup, PeerType::Kafka => todo!("Add Kafka support"), } } diff --git a/nexus/pt/src/peerdb_peers.rs b/nexus/pt/src/peerdb_peers.rs index 8b96963bb2..085b484790 100644 --- a/nexus/pt/src/peerdb_peers.rs +++ b/nexus/pt/src/peerdb_peers.rs @@ -89,6 +89,18 @@ pub struct EventHubConfig { pub location: ::prost::alloc::string::String, #[prost(message, optional, tag="4")] pub metadata_db: ::core::option::Option, + /// if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable. + #[prost(string, tag="5")] + pub subscription_id: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EventHubGroupConfig { + /// event hub peer name to event hub config + #[prost(map="string, message", tag="1")] + pub eventhubs: ::std::collections::HashMap<::prost::alloc::string::String, EventHubConfig>, + #[prost(message, optional, tag="2")] + pub metadata_db: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -117,7 +129,7 @@ pub struct Peer { pub name: ::prost::alloc::string::String, #[prost(enumeration="DbType", tag="2")] pub r#type: i32, - #[prost(oneof="peer::Config", tags="3, 4, 5, 6, 7, 8, 9")] + #[prost(oneof="peer::Config", tags="3, 4, 5, 6, 7, 8, 9, 10")] pub config: ::core::option::Option, } /// Nested message and enum types in `Peer`. @@ -139,6 +151,8 @@ pub mod peer { S3Config(super::S3Config), #[prost(message, tag="9")] SqlserverConfig(super::SqlServerConfig), + #[prost(message, tag="10")] + EventhubGroupConfig(super::EventHubGroupConfig), } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] @@ -151,6 +165,7 @@ pub enum DbType { Eventhub = 4, S3 = 5, Sqlserver = 6, + EventhubGroup = 7, } impl DbType { /// String value of the enum field names used in the ProtoBuf definition. @@ -166,6 +181,7 @@ impl DbType { DbType::Eventhub => "EVENTHUB", DbType::S3 => "S3", DbType::Sqlserver => "SQLSERVER", + DbType::EventhubGroup => "EVENTHUB_GROUP", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -178,6 +194,7 @@ impl DbType { "EVENTHUB" => Some(Self::Eventhub), "S3" => Some(Self::S3), "SQLSERVER" => Some(Self::Sqlserver), + "EVENTHUB_GROUP" => Some(Self::EventhubGroup), _ => None, } } diff --git a/nexus/pt/src/peerdb_peers.serde.rs b/nexus/pt/src/peerdb_peers.serde.rs index 652e417dbe..e9db2d25f7 100644 --- a/nexus/pt/src/peerdb_peers.serde.rs +++ b/nexus/pt/src/peerdb_peers.serde.rs @@ -289,6 +289,7 @@ impl serde::Serialize for DbType { Self::Eventhub => "EVENTHUB", Self::S3 => "S3", Self::Sqlserver => "SQLSERVER", + Self::EventhubGroup => "EVENTHUB_GROUP", }; serializer.serialize_str(variant) } @@ -307,6 +308,7 @@ impl<'de> serde::Deserialize<'de> for DbType { "EVENTHUB", "S3", "SQLSERVER", + "EVENTHUB_GROUP", ]; struct GeneratedVisitor; @@ -356,6 +358,7 @@ impl<'de> serde::Deserialize<'de> for DbType { "EVENTHUB" => Ok(DbType::Eventhub), "S3" => Ok(DbType::S3), "SQLSERVER" => Ok(DbType::Sqlserver), + "EVENTHUB_GROUP" => Ok(DbType::EventhubGroup), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } @@ -383,6 +386,9 @@ impl serde::Serialize for EventHubConfig { if self.metadata_db.is_some() { len += 1; } + if !self.subscription_id.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_peers.EventHubConfig", len)?; if !self.namespace.is_empty() { struct_ser.serialize_field("namespace", &self.namespace)?; @@ -396,6 +402,9 @@ impl serde::Serialize for EventHubConfig { if let Some(v) = self.metadata_db.as_ref() { struct_ser.serialize_field("metadataDb", v)?; } + if !self.subscription_id.is_empty() { + struct_ser.serialize_field("subscriptionId", &self.subscription_id)?; + } struct_ser.end() } } @@ -412,6 +421,8 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig { "location", "metadata_db", "metadataDb", + "subscription_id", + "subscriptionId", ]; #[allow(clippy::enum_variant_names)] @@ -420,6 +431,7 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig { ResourceGroup, Location, MetadataDb, + SubscriptionId, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -446,6 +458,7 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig { "resourceGroup" | "resource_group" => Ok(GeneratedField::ResourceGroup), "location" => Ok(GeneratedField::Location), "metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb), + "subscriptionId" | "subscription_id" => Ok(GeneratedField::SubscriptionId), _ => Ok(GeneratedField::__SkipField__), } } @@ -469,6 +482,7 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig { let mut resource_group__ = None; let mut location__ = None; let mut metadata_db__ = None; + let mut subscription_id__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::Namespace => { @@ -495,6 +509,12 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig { } metadata_db__ = map.next_value()?; } + GeneratedField::SubscriptionId => { + if subscription_id__.is_some() { + return Err(serde::de::Error::duplicate_field("subscriptionId")); + } + subscription_id__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -505,12 +525,128 @@ impl<'de> serde::Deserialize<'de> for EventHubConfig { resource_group: resource_group__.unwrap_or_default(), location: location__.unwrap_or_default(), metadata_db: metadata_db__, + subscription_id: subscription_id__.unwrap_or_default(), }) } } deserializer.deserialize_struct("peerdb_peers.EventHubConfig", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for EventHubGroupConfig { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.eventhubs.is_empty() { + len += 1; + } + if self.metadata_db.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_peers.EventHubGroupConfig", len)?; + if !self.eventhubs.is_empty() { + struct_ser.serialize_field("eventhubs", &self.eventhubs)?; + } + if let Some(v) = self.metadata_db.as_ref() { + struct_ser.serialize_field("metadataDb", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for EventHubGroupConfig { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "eventhubs", + "metadata_db", + "metadataDb", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Eventhubs, + MetadataDb, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "eventhubs" => Ok(GeneratedField::Eventhubs), + "metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = EventHubGroupConfig; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_peers.EventHubGroupConfig") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut eventhubs__ = None; + let mut metadata_db__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Eventhubs => { + if eventhubs__.is_some() { + return Err(serde::de::Error::duplicate_field("eventhubs")); + } + eventhubs__ = Some( + map.next_value::>()? + ); + } + GeneratedField::MetadataDb => { + if metadata_db__.is_some() { + return Err(serde::de::Error::duplicate_field("metadataDb")); + } + metadata_db__ = map.next_value()?; + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(EventHubGroupConfig { + eventhubs: eventhubs__.unwrap_or_default(), + metadata_db: metadata_db__, + }) + } + } + deserializer.deserialize_struct("peerdb_peers.EventHubGroupConfig", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for MongoConfig { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -725,6 +861,9 @@ impl serde::Serialize for Peer { peer::Config::SqlserverConfig(v) => { struct_ser.serialize_field("sqlserverConfig", v)?; } + peer::Config::EventhubGroupConfig(v) => { + struct_ser.serialize_field("eventhubGroupConfig", v)?; + } } } struct_ser.end() @@ -753,6 +892,8 @@ impl<'de> serde::Deserialize<'de> for Peer { "s3Config", "sqlserver_config", "sqlserverConfig", + "eventhub_group_config", + "eventhubGroupConfig", ]; #[allow(clippy::enum_variant_names)] @@ -766,6 +907,7 @@ impl<'de> serde::Deserialize<'de> for Peer { EventhubConfig, S3Config, SqlserverConfig, + EventhubGroupConfig, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -797,6 +939,7 @@ impl<'de> serde::Deserialize<'de> for Peer { "eventhubConfig" | "eventhub_config" => Ok(GeneratedField::EventhubConfig), "s3Config" | "s3_config" => Ok(GeneratedField::S3Config), "sqlserverConfig" | "sqlserver_config" => Ok(GeneratedField::SqlserverConfig), + "eventhubGroupConfig" | "eventhub_group_config" => Ok(GeneratedField::EventhubGroupConfig), _ => Ok(GeneratedField::__SkipField__), } } @@ -880,6 +1023,13 @@ impl<'de> serde::Deserialize<'de> for Peer { return Err(serde::de::Error::duplicate_field("sqlserverConfig")); } config__ = map.next_value::<::std::option::Option<_>>()?.map(peer::Config::SqlserverConfig) +; + } + GeneratedField::EventhubGroupConfig => { + if config__.is_some() { + return Err(serde::de::Error::duplicate_field("eventhubGroupConfig")); + } + config__ = map.next_value::<::std::option::Option<_>>()?.map(peer::Config::EventhubGroupConfig) ; } GeneratedField::__SkipField__ => { diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 92bc0e62d8..49b806a49c 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 92bc0e62d83a957911a3b22a869208fa822a840b +Subproject commit 49b806a49c325cff7203f9b71d0c68cf9e237f30 diff --git a/protos/peers.proto b/protos/peers.proto index 1f58a3ea7b..3bfdb68642 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -52,6 +52,14 @@ message EventHubConfig { string resource_group = 2; string location = 3; PostgresConfig metadata_db = 4; + // if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable. + string subscription_id = 5; +} + +message EventHubGroupConfig { + // event hub peer name to event hub config + map eventhubs = 1; + PostgresConfig metadata_db = 2; } message S3Config { @@ -74,6 +82,7 @@ enum DBType { EVENTHUB = 4; S3 = 5; SQLSERVER = 6; + EVENTHUB_GROUP = 7; } message Peer { @@ -87,5 +96,6 @@ message Peer { EventHubConfig eventhub_config = 7; S3Config s3_config = 8; SqlServerConfig sqlserver_config = 9; + EventHubGroupConfig eventhub_group_config = 10; } } diff --git a/ui/grpc_generated/peers.ts b/ui/grpc_generated/peers.ts index 46db253f28..bd0d8a31d0 100644 --- a/ui/grpc_generated/peers.ts +++ b/ui/grpc_generated/peers.ts @@ -12,6 +12,7 @@ export enum DBType { EVENTHUB = 4, S3 = 5, SQLSERVER = 6, + EVENTHUB_GROUP = 7, UNRECOGNIZED = -1, } @@ -38,6 +39,9 @@ export function dBTypeFromJSON(object: any): DBType { case 6: case "SQLSERVER": return DBType.SQLSERVER; + case 7: + case "EVENTHUB_GROUP": + return DBType.EVENTHUB_GROUP; case -1: case "UNRECOGNIZED": default: @@ -61,6 +65,8 @@ export function dBTypeToJSON(object: DBType): string { return "S3"; case DBType.SQLSERVER: return "SQLSERVER"; + case DBType.EVENTHUB_GROUP: + return "EVENTHUB_GROUP"; case DBType.UNRECOGNIZED: default: return "UNRECOGNIZED"; @@ -115,9 +121,24 @@ export interface EventHubConfig { namespace: string; resourceGroup: string; location: string; + metadataDb: + | PostgresConfig + | undefined; + /** if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable. */ + subscriptionId: string; +} + +export interface EventHubGroupConfig { + /** event hub peer name to event hub config */ + eventhubs: { [key: string]: EventHubConfig }; metadataDb: PostgresConfig | undefined; } +export interface EventHubGroupConfig_EventhubsEntry { + key: string; + value: EventHubConfig | undefined; +} + export interface S3Config { url: string; } @@ -140,6 +161,7 @@ export interface Peer { eventhubConfig?: EventHubConfig | undefined; s3Config?: S3Config | undefined; sqlserverConfig?: SqlServerConfig | undefined; + eventhubGroupConfig?: EventHubGroupConfig | undefined; } function createBaseSnowflakeConfig(): SnowflakeConfig { @@ -806,7 +828,7 @@ export const PostgresConfig = { }; function createBaseEventHubConfig(): EventHubConfig { - return { namespace: "", resourceGroup: "", location: "", metadataDb: undefined }; + return { namespace: "", resourceGroup: "", location: "", metadataDb: undefined, subscriptionId: "" }; } export const EventHubConfig = { @@ -823,6 +845,9 @@ export const EventHubConfig = { if (message.metadataDb !== undefined) { PostgresConfig.encode(message.metadataDb, writer.uint32(34).fork()).ldelim(); } + if (message.subscriptionId !== "") { + writer.uint32(42).string(message.subscriptionId); + } return writer; }, @@ -861,6 +886,13 @@ export const EventHubConfig = { message.metadataDb = PostgresConfig.decode(reader, reader.uint32()); continue; + case 5: + if (tag !== 42) { + break; + } + + message.subscriptionId = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -876,6 +908,7 @@ export const EventHubConfig = { resourceGroup: isSet(object.resourceGroup) ? String(object.resourceGroup) : "", location: isSet(object.location) ? String(object.location) : "", metadataDb: isSet(object.metadataDb) ? PostgresConfig.fromJSON(object.metadataDb) : undefined, + subscriptionId: isSet(object.subscriptionId) ? String(object.subscriptionId) : "", }; }, @@ -893,6 +926,9 @@ export const EventHubConfig = { if (message.metadataDb !== undefined) { obj.metadataDb = PostgresConfig.toJSON(message.metadataDb); } + if (message.subscriptionId !== "") { + obj.subscriptionId = message.subscriptionId; + } return obj; }, @@ -907,6 +943,185 @@ export const EventHubConfig = { message.metadataDb = (object.metadataDb !== undefined && object.metadataDb !== null) ? PostgresConfig.fromPartial(object.metadataDb) : undefined; + message.subscriptionId = object.subscriptionId ?? ""; + return message; + }, +}; + +function createBaseEventHubGroupConfig(): EventHubGroupConfig { + return { eventhubs: {}, metadataDb: undefined }; +} + +export const EventHubGroupConfig = { + encode(message: EventHubGroupConfig, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + Object.entries(message.eventhubs).forEach(([key, value]) => { + EventHubGroupConfig_EventhubsEntry.encode({ key: key as any, value }, writer.uint32(10).fork()).ldelim(); + }); + if (message.metadataDb !== undefined) { + PostgresConfig.encode(message.metadataDb, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): EventHubGroupConfig { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseEventHubGroupConfig(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + const entry1 = EventHubGroupConfig_EventhubsEntry.decode(reader, reader.uint32()); + if (entry1.value !== undefined) { + message.eventhubs[entry1.key] = entry1.value; + } + continue; + case 2: + if (tag !== 18) { + break; + } + + message.metadataDb = PostgresConfig.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): EventHubGroupConfig { + return { + eventhubs: isObject(object.eventhubs) + ? Object.entries(object.eventhubs).reduce<{ [key: string]: EventHubConfig }>((acc, [key, value]) => { + acc[key] = EventHubConfig.fromJSON(value); + return acc; + }, {}) + : {}, + metadataDb: isSet(object.metadataDb) ? PostgresConfig.fromJSON(object.metadataDb) : undefined, + }; + }, + + toJSON(message: EventHubGroupConfig): unknown { + const obj: any = {}; + if (message.eventhubs) { + const entries = Object.entries(message.eventhubs); + if (entries.length > 0) { + obj.eventhubs = {}; + entries.forEach(([k, v]) => { + obj.eventhubs[k] = EventHubConfig.toJSON(v); + }); + } + } + if (message.metadataDb !== undefined) { + obj.metadataDb = PostgresConfig.toJSON(message.metadataDb); + } + return obj; + }, + + create, I>>(base?: I): EventHubGroupConfig { + return EventHubGroupConfig.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): EventHubGroupConfig { + const message = createBaseEventHubGroupConfig(); + message.eventhubs = Object.entries(object.eventhubs ?? {}).reduce<{ [key: string]: EventHubConfig }>( + (acc, [key, value]) => { + if (value !== undefined) { + acc[key] = EventHubConfig.fromPartial(value); + } + return acc; + }, + {}, + ); + message.metadataDb = (object.metadataDb !== undefined && object.metadataDb !== null) + ? PostgresConfig.fromPartial(object.metadataDb) + : undefined; + return message; + }, +}; + +function createBaseEventHubGroupConfig_EventhubsEntry(): EventHubGroupConfig_EventhubsEntry { + return { key: "", value: undefined }; +} + +export const EventHubGroupConfig_EventhubsEntry = { + encode(message: EventHubGroupConfig_EventhubsEntry, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + EventHubConfig.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): EventHubGroupConfig_EventhubsEntry { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseEventHubGroupConfig_EventhubsEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = EventHubConfig.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): EventHubGroupConfig_EventhubsEntry { + return { + key: isSet(object.key) ? String(object.key) : "", + value: isSet(object.value) ? EventHubConfig.fromJSON(object.value) : undefined, + }; + }, + + toJSON(message: EventHubGroupConfig_EventhubsEntry): unknown { + const obj: any = {}; + if (message.key !== "") { + obj.key = message.key; + } + if (message.value !== undefined) { + obj.value = EventHubConfig.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): EventHubGroupConfig_EventhubsEntry { + return EventHubGroupConfig_EventhubsEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): EventHubGroupConfig_EventhubsEntry { + const message = createBaseEventHubGroupConfig_EventhubsEntry(); + message.key = object.key ?? ""; + message.value = (object.value !== undefined && object.value !== null) + ? EventHubConfig.fromPartial(object.value) + : undefined; return message; }, }; @@ -1098,6 +1313,7 @@ function createBasePeer(): Peer { eventhubConfig: undefined, s3Config: undefined, sqlserverConfig: undefined, + eventhubGroupConfig: undefined, }; } @@ -1130,6 +1346,9 @@ export const Peer = { if (message.sqlserverConfig !== undefined) { SqlServerConfig.encode(message.sqlserverConfig, writer.uint32(74).fork()).ldelim(); } + if (message.eventhubGroupConfig !== undefined) { + EventHubGroupConfig.encode(message.eventhubGroupConfig, writer.uint32(82).fork()).ldelim(); + } return writer; }, @@ -1203,6 +1422,13 @@ export const Peer = { message.sqlserverConfig = SqlServerConfig.decode(reader, reader.uint32()); continue; + case 10: + if (tag !== 82) { + break; + } + + message.eventhubGroupConfig = EventHubGroupConfig.decode(reader, reader.uint32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1223,6 +1449,9 @@ export const Peer = { eventhubConfig: isSet(object.eventhubConfig) ? EventHubConfig.fromJSON(object.eventhubConfig) : undefined, s3Config: isSet(object.s3Config) ? S3Config.fromJSON(object.s3Config) : undefined, sqlserverConfig: isSet(object.sqlserverConfig) ? SqlServerConfig.fromJSON(object.sqlserverConfig) : undefined, + eventhubGroupConfig: isSet(object.eventhubGroupConfig) + ? EventHubGroupConfig.fromJSON(object.eventhubGroupConfig) + : undefined, }; }, @@ -1255,6 +1484,9 @@ export const Peer = { if (message.sqlserverConfig !== undefined) { obj.sqlserverConfig = SqlServerConfig.toJSON(message.sqlserverConfig); } + if (message.eventhubGroupConfig !== undefined) { + obj.eventhubGroupConfig = EventHubGroupConfig.toJSON(message.eventhubGroupConfig); + } return obj; }, @@ -1286,6 +1518,9 @@ export const Peer = { message.sqlserverConfig = (object.sqlserverConfig !== undefined && object.sqlserverConfig !== null) ? SqlServerConfig.fromPartial(object.sqlserverConfig) : undefined; + message.eventhubGroupConfig = (object.eventhubGroupConfig !== undefined && object.eventhubGroupConfig !== null) + ? EventHubGroupConfig.fromPartial(object.eventhubGroupConfig) + : undefined; return message; }, }; @@ -1332,6 +1567,10 @@ if (_m0.util.Long !== Long) { _m0.configure(); } +function isObject(value: any): boolean { + return typeof value === "object" && value !== null; +} + function isSet(value: any): boolean { return value !== null && value !== undefined; }