Skip to content

Commit

Permalink
redact fields in proto using custom options (#1965)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Jul 19, 2024
1 parent 4d0420e commit 91263cf
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 83 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
activity.RecordHeartbeat(ctx, "wait another second for source connector")
attempt += 1
if attempt > 2 {
logger.Info("waiting on source connector setup", slog.Int("attempt", attempt))
logger.Info("waiting on source connector setup",
slog.Int("attempt", attempt), slog.String("sessionID", sessionID))
}
if err := ctx.Err(); err != nil {
return none, err
Expand Down
81 changes: 21 additions & 60 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package cmd
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
Expand All @@ -19,6 +20,20 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func redactProto(message proto.Message) {
message.ProtoReflect().Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool {
if fd.Kind() == protoreflect.MessageKind {
redactProto(v.Message().Interface())
} else if fd.Kind() == protoreflect.StringKind {
redacted := proto.GetExtension(fd.Options().(*descriptorpb.FieldOptions), protos.E_PeerdbRedacted).(bool)
if redacted {
message.ProtoReflect().Set(fd, protoreflect.ValueOfString("********"))
}
}
return true
})
}

func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) {
var encPeerOptions []byte
var encKeyID string
Expand Down Expand Up @@ -71,66 +86,12 @@ func (h *FlowRequestHandler) GetPeerInfo(
return nil, err
}

// omit sensitive keys
redacted := "********"
switch inner := peer.Config.(type) {
case *protos.Peer_PostgresConfig:
config := inner.PostgresConfig
config.Password = redacted
if ssh := config.SshConfig; ssh != nil {
ssh.Password = redacted
ssh.PrivateKey = redacted
ssh.HostKey = redacted
peer.ProtoReflect().Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool {
if fd.Kind() == protoreflect.MessageKind {
redactProto(v.Message().Interface())
}
case *protos.Peer_BigqueryConfig:
config := inner.BigqueryConfig
config.PrivateKey = redacted
config.PrivateKeyId = redacted
case *protos.Peer_MongoConfig:
config := inner.MongoConfig
config.Password = redacted
case *protos.Peer_S3Config:
config := inner.S3Config
config.SecretAccessKey = &redacted
case *protos.Peer_SnowflakeConfig:
config := inner.SnowflakeConfig
config.PrivateKey = redacted
config.Password = &redacted
case *protos.Peer_EventhubGroupConfig:
config := inner.EventhubGroupConfig
for _, ev := range config.Eventhubs {
ev.SubscriptionId = redacted
}
case *protos.Peer_ClickhouseConfig:
config := inner.ClickhouseConfig
config.Password = redacted
config.AccessKeyId = redacted
config.SecretAccessKey = redacted
case *protos.Peer_KafkaConfig:
config := inner.KafkaConfig
config.Password = redacted
case *protos.Peer_PubsubConfig:
config := inner.PubsubConfig
config.ServiceAccount.PrivateKey = redacted
config.ServiceAccount.PrivateKeyId = redacted
case *protos.Peer_ElasticsearchConfig:
config := inner.ElasticsearchConfig
if config.AuthType == protos.ElasticsearchAuthType_BASIC {
config.Username = &redacted
config.Password = &redacted
config.ApiKey = nil
} else if config.AuthType == protos.ElasticsearchAuthType_APIKEY {
config.Username = nil
config.Password = nil
config.ApiKey = &redacted
}
case *protos.Peer_MysqlConfig:
config := inner.MysqlConfig
config.Password = redacted
default:
// don't risk sending new peer types unredacted
return nil, errors.ErrUnsupported
}
return true
})
return peer, nil
}

Expand Down
50 changes: 28 additions & 22 deletions protos/peers.proto
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
syntax = "proto3";

import "google/protobuf/descriptor.proto";

extend google.protobuf.FieldOptions {
optional bool peerdb_redacted = 16551842;
}

package peerdb_peers;

message SSHConfig {
string host = 1;
uint32 port = 2;
string user = 3;
string password = 4;
string private_key = 5;
string host_key = 6;
string password = 4 [(peerdb_redacted) = true];
string private_key = 5 [(peerdb_redacted) = true];
string host_key = 6 [(peerdb_redacted) = true];
}

message SnowflakeConfig {
string account_id = 1;
string username = 2;
string private_key = 3;
string private_key = 3 [(peerdb_redacted) = true];
string database = 4;
string warehouse = 6;
string role = 7;
uint64 query_timeout = 8;
string s3_integration = 9;
optional string password = 10;
optional string password = 10 [(peerdb_redacted) = true];
// defaults to _PEERDB_INTERNAL
optional string metadata_schema = 11;
}

message GcpServiceAccount {
string auth_type = 1;
string project_id = 2;
string private_key_id = 3;
string private_key = 4;
string private_key_id = 3 [(peerdb_redacted) = true];
string private_key = 4 [(peerdb_redacted) = true];
string client_email = 5;
string client_id = 6;
string auth_uri = 7;
Expand All @@ -41,8 +47,8 @@ message GcpServiceAccount {
message BigqueryConfig {
string auth_type = 1;
string project_id = 2;
string private_key_id = 3;
string private_key = 4;
string private_key_id = 3 [(peerdb_redacted) = true];
string private_key = 4 [(peerdb_redacted) = true];
string client_email = 5;
string client_id = 6;
string auth_uri = 7;
Expand All @@ -58,7 +64,7 @@ message PubSubConfig {

message MongoConfig {
string username = 1;
string password = 2;
string password = 2 [(peerdb_redacted) = true];
string clusterurl = 3;
int32 clusterport = 4;
string database = 5;
Expand All @@ -68,7 +74,7 @@ message PostgresConfig {
string host = 1;
uint32 port = 2;
string user = 3;
string password = 4;
string password = 4 [(peerdb_redacted) = true];
string database = 5;
// defaults to _peerdb_internal
optional string metadata_schema = 7;
Expand All @@ -80,7 +86,7 @@ message EventHubConfig {
string resource_group = 2;
string location = 3;
// if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable.
string subscription_id = 5;
string subscription_id = 5 [(peerdb_redacted) = true];
// defaults to 3
uint32 partition_count = 6;
// defaults to 7
Expand All @@ -95,8 +101,8 @@ message EventHubGroupConfig {

message S3Config {
string url = 1;
optional string access_key_id = 2;
optional string secret_access_key = 3;
optional string access_key_id = 2 [(peerdb_redacted) = true];
optional string secret_access_key = 3 [(peerdb_redacted) = true];
optional string role_arn = 4;
optional string region = 5;
optional string endpoint = 6;
Expand All @@ -106,11 +112,11 @@ message ClickhouseConfig{
string host = 1;
uint32 port = 2;
string user = 3;
string password = 4;
string password = 4 [(peerdb_redacted) = true];
string database = 5;
string s3_path = 6; // path to S3 bucket which will store avro files
string access_key_id = 7;
string secret_access_key = 8;
string access_key_id = 7 [(peerdb_redacted) = true];
string secret_access_key = 8 [(peerdb_redacted) = true];
string region = 9;
bool disable_tls = 10;
optional string endpoint = 11;
Expand All @@ -120,15 +126,15 @@ message SqlServerConfig {
string server = 1;
uint32 port = 2;
string user = 3;
string password = 4;
string password = 4 [(peerdb_redacted) = true];
string database = 5;
}

message MySqlConfig {
string host = 1;
uint32 port = 2;
string user = 3;
string password = 4;
string password = 4 [(peerdb_redacted) = true];
string database = 5;
repeated string setup = 6;
uint32 compression = 7;
Expand All @@ -138,7 +144,7 @@ message MySqlConfig {
message KafkaConfig {
repeated string servers = 1;
string username = 2;
string password = 3;
string password = 3 [(peerdb_redacted) = true];
string sasl = 4;
bool disable_tls = 5;
string partitioner = 6;
Expand All @@ -156,8 +162,8 @@ message ElasticsearchConfig {
repeated string addresses = 1;
ElasticsearchAuthType auth_type = 2;
optional string username = 3;
optional string password = 4;
optional string api_key = 5;
optional string password = 4 [(peerdb_redacted) = true];
optional string api_key = 5 [(peerdb_redacted) = true];
}

enum DBType {
Expand Down

0 comments on commit 91263cf

Please sign in to comment.