From a26e26b9566f51f4d42a3030b25a2de1f7b1c787 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 13 Dec 2023 17:33:31 +0530 Subject: [PATCH] kafka code --- flow/connectors/core.go | 3 + flow/connectors/kafka/kafka.go | 379 +++++++++++++++++++++++++++++ flow/connectors/kafka/qrep.go | 38 +++ flow/generated/protos/peers.pb.go | 273 +++++++++++++++------ flow/go.mod | 1 + flow/go.sum | 2 + nexus/analyzer/src/lib.rs | 30 ++- nexus/catalog/src/lib.rs | 11 + nexus/pt/src/peerdb_peers.rs | 21 +- nexus/pt/src/peerdb_peers.serde.rs | 182 ++++++++++++++ nexus/server/src/main.rs | 1 + protos/peers.proto | 10 + ui/grpc_generated/peers.ts | 152 ++++++++++++ 13 files changed, 1028 insertions(+), 75 deletions(-) create mode 100644 flow/connectors/kafka/kafka.go create mode 100644 flow/connectors/kafka/qrep.go diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6c482f852a..a1910df26d 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -8,6 +8,7 @@ import ( connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" + connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" @@ -154,6 +155,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig()) case *protos.Peer_S3Config: return conns3.NewS3Connector(ctx, config.GetS3Config()) + case *protos.Peer_KafkaConfig: + return connkafka.NewKafkaConnector(ctx, config.GetKafkaConfig()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go new file mode 100644 index 0000000000..34f53e58b9 --- /dev/null +++ b/flow/connectors/kafka/kafka.go @@ -0,0 +1,379 @@ +package connkafka + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +type KafkaRecord struct { + Before model.RecordItems `json:"before"` + After model.RecordItems `json:"after"` +} + +type KafkaConnector struct { + ctx context.Context + client *kafka.AdminClient + consumer *kafka.Consumer + producer *kafka.Producer +} + +func NewKafkaConnector(ctx context.Context, + kafkaProtoConfig *protos.KafkaConfig) (*KafkaConnector, error) { + brokers := kafkaProtoConfig.Servers + connectorConfig := kafka.ConfigMap{ + "bootstrap.servers": brokers, + "allow.auto.create.topics": true, + } + securityProtocol := kafkaProtoConfig.SecurityProtocol + + if securityProtocol == "SASL_SSL" { + rootCertToVerifyBroker := kafkaProtoConfig.SslCertificate + sslConfig := kafka.ConfigMap{ + "security.protocol": "SASL_SSL", + "ssl.ca.location": rootCertToVerifyBroker, + "sasl.mechanisms": "PLAIN", + "sasl.username": kafkaProtoConfig.Username, + "sasl.password": kafkaProtoConfig.Password, + } + + for key, value := range sslConfig { + (connectorConfig)[key] = value + } + } + + producerConfig := &kafka.ConfigMap{ + "transactional.id": "peerdb", + } + consumerConfig := &kafka.ConfigMap{ + "group.id": "unused but needed", + "auto.offset.reset": "latest", + } + // Maintaining separate configs for consumer and producer. + // Otherwise, we get warnings in the logs. + for key, value := range connectorConfig { + (*consumerConfig)[key] = value + (*producerConfig)[key] = value + } + producer, err := kafka.NewProducer(producerConfig) + if err != nil { + return nil, fmt.Errorf("failed to create producer: %w", err) + } + + consumer, err := kafka.NewConsumer(consumerConfig) + if err != nil { + return nil, fmt.Errorf("failed to create consumer: %w", err) + } + + client, err := kafka.NewAdminClient(&connectorConfig) + if err != nil { + return nil, fmt.Errorf("failed to create kafka client: %w", err) + } + + return &KafkaConnector{ + ctx: ctx, + client: client, + consumer: consumer, + producer: producer, + }, nil +} + +func (c *KafkaConnector) Close() error { + if c == nil || c.client == nil { + return nil + } + c.client.Close() + return nil +} + +func (c *KafkaConnector) ConnectionActive() error { + if c == nil || c.client == nil { + return fmt.Errorf("kafka client is nil") + } + _, err := c.client.GetMetadata(nil, true, 5000) + return err +} + +func (c *KafkaConnector) NeedsSetupMetadataTables() bool { + jobName, ok := c.ctx.Value(shared.FlowNameKey).(string) + if !ok { + return false + } + + metadataTopicName := "peerdb_" + jobName + _, err := c.client.GetMetadata(&metadataTopicName, false, 5000) + return err != nil +} + +func (c *KafkaConnector) SetupMetadataTables() error { + jobName, ok := c.ctx.Value(shared.FlowNameKey).(string) + if !ok { + return fmt.Errorf("failed to get job name from context") + } + + metadataTopicName := "peerdb_" + jobName + topicResults, createErr := c.client.CreateTopics(c.ctx, []kafka.TopicSpecification{ + { + Topic: metadataTopicName, + NumPartitions: 1, + ReplicationFactor: 1, + }, + }) + if createErr != nil { + return fmt. + Errorf("failed client's topics creation: %w", createErr) + } + topicErr := topicResults[0].Error.Code().String() + if topicErr != "ErrNoError" { + if topicExists(topicErr) { + return nil + } + return fmt. + Errorf("failed to create metadata topic: %s", topicErr) + } + return nil +} + +func (c *KafkaConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + metadataTopicName := "peerdb_" + jobName + + subscribeErr := c.consumer.SubscribeTopics([]string{metadataTopicName}, nil) + if subscribeErr != nil { + return nil, fmt.Errorf("failed to subscribe offset reader to metadata topic: %w", subscribeErr) + } + + assignErr := c.consumer.Assign([]kafka.TopicPartition{{ + Topic: &metadataTopicName, + Partition: 0, + Offset: kafka.OffsetTail(2), // not 1 because ReadMessage reads the next message, not current + }}) + if assignErr != nil { + return nil, fmt.Errorf("failed to assign partition for offset reader: %w", assignErr) + } + + lastMessage, readErr := c.consumer.ReadMessage(60 * time.Second) + if readErr != nil { + if strings.Contains(readErr.Error(), "Timed out") { + + return nil, nil + } + return nil, fmt.Errorf("unable to read latest offset: %w", readErr) + } + + lastCheckpoint, integerParseErr := strconv.ParseInt(string(lastMessage.Value), 10, 64) + if integerParseErr != nil { + return nil, fmt.Errorf("error converting checkpoint string to int64: %w", integerParseErr) + } + + if err := c.consumer.Close(); err != nil { + return nil, fmt.Errorf("failed to close offset reader: %w", err) + } + + return &protos.LastSyncState{ + Checkpoint: lastCheckpoint, + }, nil +} + +func (c *KafkaConnector) GetLastSyncBatchID(jobName string) (int64, error) { + return -1, nil +} + +func topicExists(err string) bool { + return strings.Contains(err, "Topic already exists") +} + +func (c *KafkaConnector) SetupNormalizedTables( + req *protos.SetupNormalizedTableBatchInput) (*protos.SetupNormalizedTableBatchOutput, error) { + tableExistsMapping := make(map[string]bool) + for tableIdentifier := range req.TableNameSchemaMapping { + topicResults, createErr := c.client.CreateTopics(c.ctx, []kafka.TopicSpecification{ + { + Topic: tableIdentifier, + NumPartitions: 1, + ReplicationFactor: 1, + }, + }) + if createErr != nil { + if topicExists(createErr.Error()) { + tableExistsMapping[tableIdentifier] = true + continue + } + return nil, fmt. + Errorf("failed client's topics creation: %w", createErr) + } + + topicErr := topicResults[0].Error.Code().String() + if topicErr != "ErrNoError" { + if topicExists(topicErr) { + tableExistsMapping[tableIdentifier] = true + continue + } + return nil, fmt. + Errorf("failed to create destination topic: %s", topicErr) + } + tableExistsMapping[tableIdentifier] = false + } + + return &protos.SetupNormalizedTableBatchOutput{ + TableExistsMapping: tableExistsMapping, + }, nil +} + +func (c *KafkaConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error { + return nil +} + +func (c *KafkaConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + var destinationMessage kafka.Message + first := true + var firstCP int64 = 0 + lastCP, err := req.Records.GetLastCheckpoint() + if err != nil { + return nil, err + } + + records := make([]kafka.Message, 0) + for record := range req.Records.GetRecords() { + switch typedRecord := record.(type) { + case *model.InsertRecord: + insertData := KafkaRecord{ + Before: *model.NewRecordItems(), + After: *typedRecord.Items, + } + + insertJSON, err := json.Marshal(insertData) + if err != nil { + return nil, fmt.Errorf("failed to serialize insert data to JSON: %w", err) + } + + destinationMessage = kafka.Message{ + Value: insertJSON, + } + + case *model.UpdateRecord: + updateData := KafkaRecord{ + Before: *typedRecord.OldItems, + After: *typedRecord.NewItems, + } + updateJSON, err := json.Marshal(updateData) + if err != nil { + return nil, fmt.Errorf("failed to serialize update data to JSON: %w", err) + } + + destinationMessage = kafka.Message{ + Value: updateJSON, + } + + case *model.DeleteRecord: + deleteData := KafkaRecord{ + Before: *typedRecord.Items, + After: *model.NewRecordItems(), + } + + deleteJSON, err := json.Marshal(deleteData) + if err != nil { + return nil, fmt.Errorf("failed to serialize delete data to JSON: %w", err) + } + + destinationMessage = kafka.Message{ + Value: deleteJSON, + } + default: + return nil, fmt.Errorf("record type %T not supported in Kafka flow connector", typedRecord) + } + destinationTopicName := "peerdb_" + record.GetTableName() + destinationTopic := kafka.TopicPartition{ + Topic: &destinationTopicName, + Partition: kafka.PartitionAny, + } + destinationMessage.TopicPartition = destinationTopic + destinationMessage.Key = []byte("CDC") + records = append(records, destinationMessage) + + if first { + firstCP = record.GetCheckPointID() + first = false + } + } + if len(records) == 0 { + return &model.SyncResponse{ + FirstSyncedCheckPointID: 0, + LastSyncedCheckPointID: 0, + NumRecordsSynced: 0, + }, nil + } + metadataTopicName := "peerdb_" + req.FlowJobName + metadataTopic := kafka.TopicPartition{ + Topic: &metadataTopicName, + Partition: kafka.PartitionAny, + } + checkpointBytes := []byte(strconv.FormatInt(lastCP, 10)) + + initErr := c.producer.InitTransactions(c.ctx) + if initErr != nil { + return nil, fmt.Errorf("failed to initialise transaction: %w", initErr) + } + + beginErr := c.producer.BeginTransaction() + if beginErr != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", beginErr) + } + + for _, record := range records { + pushedRecord := record + writeErr := c.producer.Produce(&pushedRecord, nil) + if writeErr != nil { + abortErr := c.producer.AbortTransaction(c.ctx) + if abortErr != nil { + return nil, fmt.Errorf("destination write failed, but could not abort transaction: %w", abortErr) + } + } + } + + updateErr := c.producer.Produce(&kafka.Message{ + TopicPartition: metadataTopic, + Key: []byte("checkpoint"), + Value: checkpointBytes, + }, nil) + if updateErr != nil { + abortErr := c.producer.AbortTransaction(c.ctx) + if abortErr != nil { + return nil, fmt.Errorf("checkpoint update failed, but could not abort transaction: %w", abortErr) + } + } + + commitErr := c.producer.CommitTransaction(c.ctx) + if commitErr != nil { + return nil, fmt.Errorf("could not commit transaction: %w", commitErr) + } + + return &model.SyncResponse{ + FirstSyncedCheckPointID: firstCP, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(len(records)), + }, nil +} + +func (c *KafkaConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { + return &model.NormalizeResponse{ + Done: true, + StartBatchID: 0, + EndBatchID: 1, + }, nil +} + +func (c *KafkaConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + return nil, nil +} + +func (c *KafkaConnector) SyncFlowCleanup(jobName string) error { + return nil +} diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go new file mode 100644 index 0000000000..2c7eb47e4c --- /dev/null +++ b/flow/connectors/kafka/qrep.go @@ -0,0 +1,38 @@ +package connkafka + +import ( + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" +) + +func (c *KafkaConnector) GetQRepPartitions(config *protos.QRepConfig, + last *protos.QRepPartition, +) ([]*protos.QRepPartition, error) { + panic("kafka does not yet support query replication") +} + +func (c *KafkaConnector) PullQRepRecords(config *protos.QRepConfig, + partition *protos.QRepPartition, +) (*model.QRecordBatch, error) { + panic("kafka does not yet support query replication") +} + +func (c *KafkaConnector) SyncQRepRecords( + config *protos.QRepConfig, + partition *protos.QRepPartition, + records *model.QRecordBatch, +) (int, error) { + panic("kafka does not yet support query replication") +} + +func (c *KafkaConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error { + panic("kafka does not yet support query replication") +} + +func (c *KafkaConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { + panic("kafka does not yet support query replication") +} + +func (c *KafkaConnector) CleanupQRepFlow(config *protos.QRepConfig) error { + panic("kafka does not yet support query replication") +} diff --git a/flow/generated/protos/peers.pb.go b/flow/generated/protos/peers.pb.go index aa03dcfd82..87a0f88d19 100644 --- a/flow/generated/protos/peers.pb.go +++ b/flow/generated/protos/peers.pb.go @@ -31,6 +31,7 @@ const ( DBType_S3 DBType = 5 DBType_SQLSERVER DBType = 6 DBType_EVENTHUB_GROUP DBType = 7 + DBType_KAFKA DBType = 8 ) // Enum value maps for DBType. @@ -44,6 +45,7 @@ var ( 5: "S3", 6: "SQLSERVER", 7: "EVENTHUB_GROUP", + 8: "KAFKA", } DBType_value = map[string]int32{ "BIGQUERY": 0, @@ -54,6 +56,7 @@ var ( "S3": 5, "SQLSERVER": 6, "EVENTHUB_GROUP": 7, + "KAFKA": 8, } ) @@ -930,6 +933,85 @@ func (x *SqlServerConfig) GetDatabase() string { return "" } +type KafkaConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Servers string `protobuf:"bytes,1,opt,name=servers,proto3" json:"servers,omitempty"` + SecurityProtocol string `protobuf:"bytes,2,opt,name=security_protocol,json=securityProtocol,proto3" json:"security_protocol,omitempty"` + SslCertificate string `protobuf:"bytes,3,opt,name=ssl_certificate,json=sslCertificate,proto3" json:"ssl_certificate,omitempty"` + Username string `protobuf:"bytes,4,opt,name=username,proto3" json:"username,omitempty"` + Password string `protobuf:"bytes,5,opt,name=password,proto3" json:"password,omitempty"` +} + +func (x *KafkaConfig) Reset() { + *x = KafkaConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_peers_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *KafkaConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KafkaConfig) ProtoMessage() {} + +func (x *KafkaConfig) ProtoReflect() protoreflect.Message { + mi := &file_peers_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KafkaConfig.ProtoReflect.Descriptor instead. +func (*KafkaConfig) Descriptor() ([]byte, []int) { + return file_peers_proto_rawDescGZIP(), []int{9} +} + +func (x *KafkaConfig) GetServers() string { + if x != nil { + return x.Servers + } + return "" +} + +func (x *KafkaConfig) GetSecurityProtocol() string { + if x != nil { + return x.SecurityProtocol + } + return "" +} + +func (x *KafkaConfig) GetSslCertificate() string { + if x != nil { + return x.SslCertificate + } + return "" +} + +func (x *KafkaConfig) GetUsername() string { + if x != nil { + return x.Username + } + return "" +} + +func (x *KafkaConfig) GetPassword() string { + if x != nil { + return x.Password + } + return "" +} + type Peer struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -947,13 +1029,14 @@ type Peer struct { // *Peer_S3Config // *Peer_SqlserverConfig // *Peer_EventhubGroupConfig + // *Peer_KafkaConfig Config isPeer_Config `protobuf_oneof:"config"` } func (x *Peer) Reset() { *x = Peer{} if protoimpl.UnsafeEnabled { - mi := &file_peers_proto_msgTypes[9] + mi := &file_peers_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -966,7 +1049,7 @@ func (x *Peer) String() string { func (*Peer) ProtoMessage() {} func (x *Peer) ProtoReflect() protoreflect.Message { - mi := &file_peers_proto_msgTypes[9] + mi := &file_peers_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -979,7 +1062,7 @@ func (x *Peer) ProtoReflect() protoreflect.Message { // Deprecated: Use Peer.ProtoReflect.Descriptor instead. func (*Peer) Descriptor() ([]byte, []int) { - return file_peers_proto_rawDescGZIP(), []int{9} + return file_peers_proto_rawDescGZIP(), []int{10} } func (x *Peer) GetName() string { @@ -1059,6 +1142,13 @@ func (x *Peer) GetEventhubGroupConfig() *EventHubGroupConfig { return nil } +func (x *Peer) GetKafkaConfig() *KafkaConfig { + if x, ok := x.GetConfig().(*Peer_KafkaConfig); ok { + return x.KafkaConfig + } + return nil +} + type isPeer_Config interface { isPeer_Config() } @@ -1095,6 +1185,10 @@ type Peer_EventhubGroupConfig struct { EventhubGroupConfig *EventHubGroupConfig `protobuf:"bytes,10,opt,name=eventhub_group_config,json=eventhubGroupConfig,proto3,oneof"` } +type Peer_KafkaConfig struct { + KafkaConfig *KafkaConfig `protobuf:"bytes,11,opt,name=kafka_config,json=kafkaConfig,proto3,oneof"` +} + func (*Peer_SnowflakeConfig) isPeer_Config() {} func (*Peer_BigqueryConfig) isPeer_Config() {} @@ -1111,6 +1205,8 @@ func (*Peer_SqlserverConfig) isPeer_Config() {} func (*Peer_EventhubGroupConfig) isPeer_Config() {} +func (*Peer_KafkaConfig) isPeer_Config() {} + var File_peers_proto protoreflect.FileDescriptor var file_peers_proto_rawDesc = []byte{ @@ -1274,64 +1370,80 @@ var file_peers_proto_rawDesc = []byte{ 0x6f, 0x72, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, - 0x91, 0x05, 0x0a, 0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, - 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, - 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, - 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, - 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, - 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, - 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, - 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, - 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, - 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, - 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, - 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, - 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, - 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, - 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, - 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, - 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, - 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x2a, 0x77, 0x0a, 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, - 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, - 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, 0x4b, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, - 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, - 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, - 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, 0x33, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, - 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, 0x10, 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, - 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x10, 0x07, 0x42, 0x7c, 0x0a, 0x10, - 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, - 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, - 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, - 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, - 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, - 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0xb5, 0x01, 0x0a, 0x0b, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, + 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x2b, 0x0a, 0x11, 0x73, 0x65, 0x63, + 0x75, 0x72, 0x69, 0x74, 0x79, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x73, 0x65, 0x63, 0x75, 0x72, 0x69, 0x74, 0x79, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x73, 0x6c, 0x5f, 0x63, 0x65, + 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0e, 0x73, 0x73, 0x6c, 0x43, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, + 0x1a, 0x0a, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x70, + 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, + 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0xd1, 0x05, 0x0a, 0x04, 0x50, 0x65, 0x65, 0x72, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, + 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x4a, + 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, + 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, 0x77, 0x66, + 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x62, 0x69, + 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, + 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x5f, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, 0x74, + 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x6f, + 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, 0x0a, 0x10, + 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x47, + 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x82, 0x01, 0x0a, 0x06, + 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, + 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, 0x4b, + 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, 0x0c, + 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, + 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, 0x33, + 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, 0x10, + 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, 0x52, + 0x4f, 0x55, 0x50, 0x10, 0x07, 0x12, 0x09, 0x0a, 0x05, 0x4b, 0x41, 0x46, 0x4b, 0x41, 0x10, 0x08, + 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, + 0x65, 0x65, 0x72, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1347,7 +1459,7 @@ func file_peers_proto_rawDescGZIP() []byte { } var file_peers_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_peers_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_peers_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_peers_proto_goTypes = []interface{}{ (DBType)(0), // 0: peerdb_peers.DBType (*SSHConfig)(nil), // 1: peerdb_peers.SSHConfig @@ -1359,13 +1471,14 @@ var file_peers_proto_goTypes = []interface{}{ (*EventHubGroupConfig)(nil), // 7: peerdb_peers.EventHubGroupConfig (*S3Config)(nil), // 8: peerdb_peers.S3Config (*SqlServerConfig)(nil), // 9: peerdb_peers.SqlServerConfig - (*Peer)(nil), // 10: peerdb_peers.Peer - nil, // 11: peerdb_peers.EventHubGroupConfig.EventhubsEntry + (*KafkaConfig)(nil), // 10: peerdb_peers.KafkaConfig + (*Peer)(nil), // 11: peerdb_peers.Peer + nil, // 12: peerdb_peers.EventHubGroupConfig.EventhubsEntry } var file_peers_proto_depIdxs = []int32{ 1, // 0: peerdb_peers.PostgresConfig.ssh_config:type_name -> peerdb_peers.SSHConfig 5, // 1: peerdb_peers.EventHubConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig - 11, // 2: peerdb_peers.EventHubGroupConfig.eventhubs:type_name -> peerdb_peers.EventHubGroupConfig.EventhubsEntry + 12, // 2: peerdb_peers.EventHubGroupConfig.eventhubs:type_name -> peerdb_peers.EventHubGroupConfig.EventhubsEntry 5, // 3: peerdb_peers.EventHubGroupConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig 5, // 4: peerdb_peers.S3Config.metadata_db:type_name -> peerdb_peers.PostgresConfig 0, // 5: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType @@ -1377,12 +1490,13 @@ var file_peers_proto_depIdxs = []int32{ 8, // 11: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config 9, // 12: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig 7, // 13: peerdb_peers.Peer.eventhub_group_config:type_name -> peerdb_peers.EventHubGroupConfig - 6, // 14: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig - 15, // [15:15] is the sub-list for method output_type - 15, // [15:15] is the sub-list for method input_type - 15, // [15:15] is the sub-list for extension type_name - 15, // [15:15] is the sub-list for extension extendee - 0, // [0:15] is the sub-list for field type_name + 10, // 14: peerdb_peers.Peer.kafka_config:type_name -> peerdb_peers.KafkaConfig + 6, // 15: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_peers_proto_init() } @@ -1500,6 +1614,18 @@ func file_peers_proto_init() { } } file_peers_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*KafkaConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_peers_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Peer); i { case 0: return &v.state @@ -1515,7 +1641,7 @@ func file_peers_proto_init() { file_peers_proto_msgTypes[1].OneofWrappers = []interface{}{} file_peers_proto_msgTypes[4].OneofWrappers = []interface{}{} file_peers_proto_msgTypes[7].OneofWrappers = []interface{}{} - file_peers_proto_msgTypes[9].OneofWrappers = []interface{}{ + file_peers_proto_msgTypes[10].OneofWrappers = []interface{}{ (*Peer_SnowflakeConfig)(nil), (*Peer_BigqueryConfig)(nil), (*Peer_MongoConfig)(nil), @@ -1524,6 +1650,7 @@ func file_peers_proto_init() { (*Peer_S3Config)(nil), (*Peer_SqlserverConfig)(nil), (*Peer_EventhubGroupConfig)(nil), + (*Peer_KafkaConfig)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1531,7 +1658,7 @@ func file_peers_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_peers_proto_rawDesc, NumEnums: 1, - NumMessages: 11, + NumMessages: 12, NumExtensions: 0, NumServices: 0, }, diff --git a/flow/go.mod b/flow/go.mod index f0bb33c999..4eb246b8de 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -50,6 +50,7 @@ require ( github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect + github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 // indirect github.com/getsentry/sentry-go v0.25.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/flow/go.sum b/flow/go.sum index 977cb09638..fdfa8c1f45 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -126,6 +126,8 @@ github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwP github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index f65fe4b2d0..6d39324e79 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -11,7 +11,7 @@ use pt::{ flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob}, peerdb_peers::{ peer::Config, BigqueryConfig, DbType, EventHubConfig, MongoConfig, Peer, PostgresConfig, - S3Config, SnowflakeConfig, SqlServerConfig, + S3Config, SnowflakeConfig, SqlServerConfig, KafkaConfig, }, }; use qrep::process_options; @@ -793,6 +793,34 @@ fn parse_db_options( let config = Config::EventhubGroupConfig(eventhub_group_config); Some(config) } + DbType::Kafka => { + let security_protocol = opts + .get("security_protocol") + .context("no security protocol specified")? + .to_string(); + + let kafka_config = KafkaConfig { + servers: opts + .get("servers") + .context("no kafka server hosts specified")? + .to_string(), + security_protocol, + ssl_certificate: match opts.get("ssl_certificate") { + Some(certificate) => certificate.to_string(), + None => String::new(), + }, + username: match opts.get("username") { + Some(certificate) => certificate.to_string(), + None => String::new(), + }, + password: match opts.get("password") { + Some(certificate) => certificate.to_string(), + None => String::new(), + }, + }; + let config = Config::KafkaConfig(kafka_config); + Some(config) + } }; Ok(config) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 94119ff6c1..6a63dfcfb3 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -150,6 +150,11 @@ impl Catalog { buf.reserve(config_len); eventhub_group_config.encode(&mut buf)?; } + Config::KafkaConfig(kafka_config) => { + let config_len = kafka_config.encoded_len(); + buf.reserve(config_len); + kafka_config.encode(&mut buf)?; + } }; buf @@ -345,6 +350,12 @@ impl Catalog { pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?; Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) } + Some(DbType::Kafka) => { + let err = format!("unable to decode {} options for peer {}", "kafka", name); + let kafka_config = + pt::peerdb_peers::KafkaConfig::decode(options).context(err)?; + Ok(Some(Config::KafkaConfig(kafka_config))) + } None => Ok(None), } } diff --git a/nexus/pt/src/peerdb_peers.rs b/nexus/pt/src/peerdb_peers.rs index 8266eea158..271732c8a2 100644 --- a/nexus/pt/src/peerdb_peers.rs +++ b/nexus/pt/src/peerdb_peers.rs @@ -166,12 +166,26 @@ pub struct SqlServerConfig { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct KafkaConfig { + #[prost(string, tag="1")] + pub servers: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub security_protocol: ::prost::alloc::string::String, + #[prost(string, tag="3")] + pub ssl_certificate: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub username: ::prost::alloc::string::String, + #[prost(string, tag="5")] + pub password: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Peer { #[prost(string, tag="1")] pub name: ::prost::alloc::string::String, #[prost(enumeration="DbType", tag="2")] pub r#type: i32, - #[prost(oneof="peer::Config", tags="3, 4, 5, 6, 7, 8, 9, 10")] + #[prost(oneof="peer::Config", tags="3, 4, 5, 6, 7, 8, 9, 10, 11")] pub config: ::core::option::Option, } /// Nested message and enum types in `Peer`. @@ -195,6 +209,8 @@ pub mod peer { SqlserverConfig(super::SqlServerConfig), #[prost(message, tag="10")] EventhubGroupConfig(super::EventHubGroupConfig), + #[prost(message, tag="11")] + KafkaConfig(super::KafkaConfig), } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] @@ -208,6 +224,7 @@ pub enum DbType { S3 = 5, Sqlserver = 6, EventhubGroup = 7, + Kafka = 8, } impl DbType { /// String value of the enum field names used in the ProtoBuf definition. @@ -224,6 +241,7 @@ impl DbType { DbType::S3 => "S3", DbType::Sqlserver => "SQLSERVER", DbType::EventhubGroup => "EVENTHUB_GROUP", + DbType::Kafka => "KAFKA", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -237,6 +255,7 @@ impl DbType { "S3" => Some(Self::S3), "SQLSERVER" => Some(Self::Sqlserver), "EVENTHUB_GROUP" => Some(Self::EventhubGroup), + "KAFKA" => Some(Self::Kafka), _ => None, } } diff --git a/nexus/pt/src/peerdb_peers.serde.rs b/nexus/pt/src/peerdb_peers.serde.rs index 18c206865e..377cc31795 100644 --- a/nexus/pt/src/peerdb_peers.serde.rs +++ b/nexus/pt/src/peerdb_peers.serde.rs @@ -290,6 +290,7 @@ impl serde::Serialize for DbType { Self::S3 => "S3", Self::Sqlserver => "SQLSERVER", Self::EventhubGroup => "EVENTHUB_GROUP", + Self::Kafka => "KAFKA", }; serializer.serialize_str(variant) } @@ -309,6 +310,7 @@ impl<'de> serde::Deserialize<'de> for DbType { "S3", "SQLSERVER", "EVENTHUB_GROUP", + "KAFKA", ]; struct GeneratedVisitor; @@ -359,6 +361,7 @@ impl<'de> serde::Deserialize<'de> for DbType { "S3" => Ok(DbType::S3), "SQLSERVER" => Ok(DbType::Sqlserver), "EVENTHUB_GROUP" => Ok(DbType::EventhubGroup), + "KAFKA" => Ok(DbType::Kafka), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } @@ -705,6 +708,171 @@ impl<'de> serde::Deserialize<'de> for EventHubGroupConfig { deserializer.deserialize_struct("peerdb_peers.EventHubGroupConfig", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for KafkaConfig { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.servers.is_empty() { + len += 1; + } + if !self.security_protocol.is_empty() { + len += 1; + } + if !self.ssl_certificate.is_empty() { + len += 1; + } + if !self.username.is_empty() { + len += 1; + } + if !self.password.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_peers.KafkaConfig", len)?; + if !self.servers.is_empty() { + struct_ser.serialize_field("servers", &self.servers)?; + } + if !self.security_protocol.is_empty() { + struct_ser.serialize_field("securityProtocol", &self.security_protocol)?; + } + if !self.ssl_certificate.is_empty() { + struct_ser.serialize_field("sslCertificate", &self.ssl_certificate)?; + } + if !self.username.is_empty() { + struct_ser.serialize_field("username", &self.username)?; + } + if !self.password.is_empty() { + struct_ser.serialize_field("password", &self.password)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for KafkaConfig { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "servers", + "security_protocol", + "securityProtocol", + "ssl_certificate", + "sslCertificate", + "username", + "password", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Servers, + SecurityProtocol, + SslCertificate, + Username, + Password, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "servers" => Ok(GeneratedField::Servers), + "securityProtocol" | "security_protocol" => Ok(GeneratedField::SecurityProtocol), + "sslCertificate" | "ssl_certificate" => Ok(GeneratedField::SslCertificate), + "username" => Ok(GeneratedField::Username), + "password" => Ok(GeneratedField::Password), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = KafkaConfig; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_peers.KafkaConfig") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut servers__ = None; + let mut security_protocol__ = None; + let mut ssl_certificate__ = None; + let mut username__ = None; + let mut password__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Servers => { + if servers__.is_some() { + return Err(serde::de::Error::duplicate_field("servers")); + } + servers__ = Some(map.next_value()?); + } + GeneratedField::SecurityProtocol => { + if security_protocol__.is_some() { + return Err(serde::de::Error::duplicate_field("securityProtocol")); + } + security_protocol__ = Some(map.next_value()?); + } + GeneratedField::SslCertificate => { + if ssl_certificate__.is_some() { + return Err(serde::de::Error::duplicate_field("sslCertificate")); + } + ssl_certificate__ = Some(map.next_value()?); + } + GeneratedField::Username => { + if username__.is_some() { + return Err(serde::de::Error::duplicate_field("username")); + } + username__ = Some(map.next_value()?); + } + GeneratedField::Password => { + if password__.is_some() { + return Err(serde::de::Error::duplicate_field("password")); + } + password__ = Some(map.next_value()?); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(KafkaConfig { + servers: servers__.unwrap_or_default(), + security_protocol: security_protocol__.unwrap_or_default(), + ssl_certificate: ssl_certificate__.unwrap_or_default(), + username: username__.unwrap_or_default(), + password: password__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_peers.KafkaConfig", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for MongoConfig { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -922,6 +1090,9 @@ impl serde::Serialize for Peer { peer::Config::EventhubGroupConfig(v) => { struct_ser.serialize_field("eventhubGroupConfig", v)?; } + peer::Config::KafkaConfig(v) => { + struct_ser.serialize_field("kafkaConfig", v)?; + } } } struct_ser.end() @@ -952,6 +1123,8 @@ impl<'de> serde::Deserialize<'de> for Peer { "sqlserverConfig", "eventhub_group_config", "eventhubGroupConfig", + "kafka_config", + "kafkaConfig", ]; #[allow(clippy::enum_variant_names)] @@ -966,6 +1139,7 @@ impl<'de> serde::Deserialize<'de> for Peer { S3Config, SqlserverConfig, EventhubGroupConfig, + KafkaConfig, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -998,6 +1172,7 @@ impl<'de> serde::Deserialize<'de> for Peer { "s3Config" | "s3_config" => Ok(GeneratedField::S3Config), "sqlserverConfig" | "sqlserver_config" => Ok(GeneratedField::SqlserverConfig), "eventhubGroupConfig" | "eventhub_group_config" => Ok(GeneratedField::EventhubGroupConfig), + "kafkaConfig" | "kafka_config" => Ok(GeneratedField::KafkaConfig), _ => Ok(GeneratedField::__SkipField__), } } @@ -1088,6 +1263,13 @@ impl<'de> serde::Deserialize<'de> for Peer { return Err(serde::de::Error::duplicate_field("eventhubGroupConfig")); } config__ = map.next_value::<::std::option::Option<_>>()?.map(peer::Config::EventhubGroupConfig) +; + } + GeneratedField::KafkaConfig => { + if config__.is_some() { + return Err(serde::de::Error::duplicate_field("kafkaConfig")); + } + config__ = map.next_value::<::std::option::Option<_>>()?.map(peer::Config::KafkaConfig) ; } GeneratedField::__SkipField__ => { diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index a6d9ef4f06..70f06cf4ad 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -158,6 +158,7 @@ impl NexusBackend { let unsupported_peer_types = [ 4, // EVENTHUB 7, // EVENTHUB_GROUP + 8, // KAFKA ]; !unsupported_peer_types.contains(&peer_type) } diff --git a/protos/peers.proto b/protos/peers.proto index 18a4577aeb..1acfb411e0 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -97,6 +97,14 @@ message SqlServerConfig { string database = 5; } +message KafkaConfig { + string servers = 1; + string security_protocol = 2; + string ssl_certificate = 3; + string username = 4; + string password = 5; +} + enum DBType { BIGQUERY = 0; SNOWFLAKE = 1; @@ -106,6 +114,7 @@ enum DBType { S3 = 5; SQLSERVER = 6; EVENTHUB_GROUP = 7; + KAFKA = 8; } message Peer { @@ -120,5 +129,6 @@ message Peer { S3Config s3_config = 8; SqlServerConfig sqlserver_config = 9; EventHubGroupConfig eventhub_group_config = 10; + KafkaConfig kafka_config = 11; } } diff --git a/ui/grpc_generated/peers.ts b/ui/grpc_generated/peers.ts index 7cdd2ca50b..e6d769c354 100644 --- a/ui/grpc_generated/peers.ts +++ b/ui/grpc_generated/peers.ts @@ -13,6 +13,7 @@ export enum DBType { S3 = 5, SQLSERVER = 6, EVENTHUB_GROUP = 7, + KAFKA = 8, UNRECOGNIZED = -1, } @@ -42,6 +43,9 @@ export function dBTypeFromJSON(object: any): DBType { case 7: case "EVENTHUB_GROUP": return DBType.EVENTHUB_GROUP; + case 8: + case "KAFKA": + return DBType.KAFKA; case -1: case "UNRECOGNIZED": default: @@ -67,6 +71,8 @@ export function dBTypeToJSON(object: DBType): string { return "SQLSERVER"; case DBType.EVENTHUB_GROUP: return "EVENTHUB_GROUP"; + case DBType.KAFKA: + return "KAFKA"; case DBType.UNRECOGNIZED: default: return "UNRECOGNIZED"; @@ -177,6 +183,14 @@ export interface SqlServerConfig { database: string; } +export interface KafkaConfig { + servers: string; + securityProtocol: string; + sslCertificate: string; + username: string; + password: string; +} + export interface Peer { name: string; type: DBType; @@ -188,6 +202,7 @@ export interface Peer { s3Config?: S3Config | undefined; sqlserverConfig?: SqlServerConfig | undefined; eventhubGroupConfig?: EventHubGroupConfig | undefined; + kafkaConfig?: KafkaConfig | undefined; } function createBaseSSHConfig(): SSHConfig { @@ -1659,6 +1674,125 @@ export const SqlServerConfig = { }, }; +function createBaseKafkaConfig(): KafkaConfig { + return { servers: "", securityProtocol: "", sslCertificate: "", username: "", password: "" }; +} + +export const KafkaConfig = { + encode(message: KafkaConfig, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.servers !== "") { + writer.uint32(10).string(message.servers); + } + if (message.securityProtocol !== "") { + writer.uint32(18).string(message.securityProtocol); + } + if (message.sslCertificate !== "") { + writer.uint32(26).string(message.sslCertificate); + } + if (message.username !== "") { + writer.uint32(34).string(message.username); + } + if (message.password !== "") { + writer.uint32(42).string(message.password); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): KafkaConfig { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseKafkaConfig(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.servers = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.securityProtocol = reader.string(); + continue; + case 3: + if (tag !== 26) { + break; + } + + message.sslCertificate = reader.string(); + continue; + case 4: + if (tag !== 34) { + break; + } + + message.username = reader.string(); + continue; + case 5: + if (tag !== 42) { + break; + } + + message.password = reader.string(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): KafkaConfig { + return { + servers: isSet(object.servers) ? String(object.servers) : "", + securityProtocol: isSet(object.securityProtocol) ? String(object.securityProtocol) : "", + sslCertificate: isSet(object.sslCertificate) ? String(object.sslCertificate) : "", + username: isSet(object.username) ? String(object.username) : "", + password: isSet(object.password) ? String(object.password) : "", + }; + }, + + toJSON(message: KafkaConfig): unknown { + const obj: any = {}; + if (message.servers !== "") { + obj.servers = message.servers; + } + if (message.securityProtocol !== "") { + obj.securityProtocol = message.securityProtocol; + } + if (message.sslCertificate !== "") { + obj.sslCertificate = message.sslCertificate; + } + if (message.username !== "") { + obj.username = message.username; + } + if (message.password !== "") { + obj.password = message.password; + } + return obj; + }, + + create, I>>(base?: I): KafkaConfig { + return KafkaConfig.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): KafkaConfig { + const message = createBaseKafkaConfig(); + message.servers = object.servers ?? ""; + message.securityProtocol = object.securityProtocol ?? ""; + message.sslCertificate = object.sslCertificate ?? ""; + message.username = object.username ?? ""; + message.password = object.password ?? ""; + return message; + }, +}; + function createBasePeer(): Peer { return { name: "", @@ -1671,6 +1805,7 @@ function createBasePeer(): Peer { s3Config: undefined, sqlserverConfig: undefined, eventhubGroupConfig: undefined, + kafkaConfig: undefined, }; } @@ -1706,6 +1841,9 @@ export const Peer = { if (message.eventhubGroupConfig !== undefined) { EventHubGroupConfig.encode(message.eventhubGroupConfig, writer.uint32(82).fork()).ldelim(); } + if (message.kafkaConfig !== undefined) { + KafkaConfig.encode(message.kafkaConfig, writer.uint32(90).fork()).ldelim(); + } return writer; }, @@ -1786,6 +1924,13 @@ export const Peer = { message.eventhubGroupConfig = EventHubGroupConfig.decode(reader, reader.uint32()); continue; + case 11: + if (tag !== 90) { + break; + } + + message.kafkaConfig = KafkaConfig.decode(reader, reader.uint32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1809,6 +1954,7 @@ export const Peer = { eventhubGroupConfig: isSet(object.eventhubGroupConfig) ? EventHubGroupConfig.fromJSON(object.eventhubGroupConfig) : undefined, + kafkaConfig: isSet(object.kafkaConfig) ? KafkaConfig.fromJSON(object.kafkaConfig) : undefined, }; }, @@ -1844,6 +1990,9 @@ export const Peer = { if (message.eventhubGroupConfig !== undefined) { obj.eventhubGroupConfig = EventHubGroupConfig.toJSON(message.eventhubGroupConfig); } + if (message.kafkaConfig !== undefined) { + obj.kafkaConfig = KafkaConfig.toJSON(message.kafkaConfig); + } return obj; }, @@ -1878,6 +2027,9 @@ export const Peer = { message.eventhubGroupConfig = (object.eventhubGroupConfig !== undefined && object.eventhubGroupConfig !== null) ? EventHubGroupConfig.fromPartial(object.eventhubGroupConfig) : undefined; + message.kafkaConfig = (object.kafkaConfig !== undefined && object.kafkaConfig !== null) + ? KafkaConfig.fromPartial(object.kafkaConfig) + : undefined; return message; }, };