diff --git a/cmd/redshiftloader/config/config.go b/cmd/redshiftloader/config/config.go index 0ff5920df..5bfc183f9 100644 --- a/cmd/redshiftloader/config/config.go +++ b/cmd/redshiftloader/config/config.go @@ -18,6 +18,7 @@ type Config struct { S3Sink s3sink.Config `yaml:"s3sink"` SchemaRegistryURL string `yaml:"schemaRegistryURL"` Redshift redshift.RedshiftConfig `yaml:"redshift"` + RedshiftGroup *string `yaml:"redshiftGroup,omitempty"` } func LoadConfig(cmd *cobra.Command) (Config, error) { diff --git a/cmd/redshiftloader/main.go b/cmd/redshiftloader/main.go index f15a305cb..3894b3f2f 100644 --- a/cmd/redshiftloader/main.go +++ b/cmd/redshiftloader/main.go @@ -110,6 +110,7 @@ func run(cmd *cobra.Command, args []string) { config.Loader, groupConfig.Sarama, redshifter, + config.RedshiftGroup, ), ) if err != nil { diff --git a/controllers/loader_deployment.go b/controllers/loader_deployment.go index dc9445432..6013922c2 100644 --- a/controllers/loader_deployment.go +++ b/controllers/loader_deployment.go @@ -311,6 +311,7 @@ func NewLoader( MaxOpenConns: maxOpenConns, MaxIdleConns: maxIdleConns, }, + RedshiftGroup: rsk.Spec.Loader.RedshiftGroup, } confBytes, err := yaml.Marshal(conf) if err != nil { diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index 3b882c47f..49ad9d579 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -67,6 +67,9 @@ type loadProcessor struct { // redshiftSchema schema to operate on redshiftSchema string + // redshiftGroup to gives access to + redshiftGroup *string + // redshiftStats stats show the db stats info in logs when enabled redshiftStats bool @@ -91,6 +94,7 @@ func newLoadProcessor( partition int32, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift, + redshiftGroup *string, ) (serializer.MessageBatchSyncProcessor, error) { sink, err := s3sink.NewS3Sink( viper.GetString("s3sink.accessKeyId"), @@ -115,6 +119,7 @@ func newLoadProcessor( viper.GetString("schemaRegistryURL")), redshifter: redshifter, redshiftSchema: viper.GetString("redshift.schema"), + redshiftGroup: redshiftGroup, stagingTable: nil, targetTable: nil, tableSuffix: viper.GetString("redshift.tableSuffix"), @@ -535,6 +540,15 @@ func (b *loadProcessor) migrateTable( return fmt.Errorf("Error migrating table, err:%v\n", err) } + if b.redshiftGroup != nil { + klog.V(2).Infof("%s, granting schema access for table: %v to group: %v", b.topic, inputTable.Name, *b.redshiftGroup) + err = b.redshifter.GrantSchemaAccess(ctx, tx, inputTable.Meta.Schema, inputTable.Name, *b.redshiftGroup) + if err != nil { + return err + } + return err + } + err = tx.Commit() if err != nil { return fmt.Errorf("Error committing tx, err:%v\n", err) diff --git a/pkg/redshiftloader/loader_handler.go b/pkg/redshiftloader/loader_handler.go index 77b5859c0..e4fc5306f 100644 --- a/pkg/redshiftloader/loader_handler.go +++ b/pkg/redshiftloader/loader_handler.go @@ -48,9 +48,10 @@ type loaderHandler struct { maxWaitSeconds *int maxBytesPerBatch *int64 - saramaConfig kafka.SaramaConfig - redshifter *redshift.Redshift - serializer serializer.Serializer + saramaConfig kafka.SaramaConfig + redshifter *redshift.Redshift + redshiftGroup *string + serializer serializer.Serializer } func NewHandler( @@ -60,6 +61,7 @@ func NewHandler( loaderConfig LoaderConfig, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift, + redshiftGroup *string, ) *loaderHandler { // apply defaults if loaderConfig.MaxWaitSeconds == nil { @@ -77,9 +79,10 @@ func NewHandler( maxWaitSeconds: loaderConfig.MaxWaitSeconds, maxBytesPerBatch: loaderConfig.MaxBytesPerBatch, - saramaConfig: saramaConfig, - redshifter: redshifter, - serializer: serializer.NewSerializer(viper.GetString("schemaRegistryURL")), + saramaConfig: saramaConfig, + redshifter: redshifter, + redshiftGroup: redshiftGroup, + serializer: serializer.NewSerializer(viper.GetString("schemaRegistryURL")), } } @@ -123,6 +126,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim.Partition(), h.saramaConfig, h.redshifter, + h.redshiftGroup, ) if err != nil { return fmt.Errorf(