Skip to content

Commit

Permalink
Fixes #172 grant schema access from loader
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed Apr 14, 2021
1 parent f8e39d7 commit d650741
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 6 deletions.
1 change: 1 addition & 0 deletions cmd/redshiftloader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
S3Sink s3sink.Config `yaml:"s3sink"`
SchemaRegistryURL string `yaml:"schemaRegistryURL"`
Redshift redshift.RedshiftConfig `yaml:"redshift"`
RedshiftGroup *string `yaml:"redshiftGroup,omitempty"`
}

func LoadConfig(cmd *cobra.Command) (Config, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/redshiftloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func run(cmd *cobra.Command, args []string) {
config.Loader,
groupConfig.Sarama,
redshifter,
config.RedshiftGroup,
),
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions controllers/loader_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func NewLoader(
MaxOpenConns: maxOpenConns,
MaxIdleConns: maxIdleConns,
},
RedshiftGroup: rsk.Spec.Loader.RedshiftGroup,
}
confBytes, err := yaml.Marshal(conf)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type loadProcessor struct {
// redshiftSchema schema to operate on
redshiftSchema string

// redshiftGroup to gives access to
redshiftGroup *string

// redshiftStats stats show the db stats info in logs when enabled
redshiftStats bool

Expand All @@ -91,6 +94,7 @@ func newLoadProcessor(
partition int32,
saramaConfig kafka.SaramaConfig,
redshifter *redshift.Redshift,
redshiftGroup *string,
) (serializer.MessageBatchSyncProcessor, error) {
sink, err := s3sink.NewS3Sink(
viper.GetString("s3sink.accessKeyId"),
Expand All @@ -115,6 +119,7 @@ func newLoadProcessor(
viper.GetString("schemaRegistryURL")),
redshifter: redshifter,
redshiftSchema: viper.GetString("redshift.schema"),
redshiftGroup: redshiftGroup,
stagingTable: nil,
targetTable: nil,
tableSuffix: viper.GetString("redshift.tableSuffix"),
Expand Down Expand Up @@ -535,6 +540,15 @@ func (b *loadProcessor) migrateTable(
return fmt.Errorf("Error migrating table, err:%v\n", err)
}

if b.redshiftGroup != nil {
klog.V(2).Infof("%s, granting schema access for table: %v to group: %v", b.topic, inputTable.Name, *b.redshiftGroup)
err = b.redshifter.GrantSchemaAccess(ctx, tx, inputTable.Meta.Schema, inputTable.Name, *b.redshiftGroup)
if err != nil {
return err
}
return err
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("Error committing tx, err:%v\n", err)
Expand Down
16 changes: 10 additions & 6 deletions pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ type loaderHandler struct {
maxWaitSeconds *int
maxBytesPerBatch *int64

saramaConfig kafka.SaramaConfig
redshifter *redshift.Redshift
serializer serializer.Serializer
saramaConfig kafka.SaramaConfig
redshifter *redshift.Redshift
redshiftGroup *string
serializer serializer.Serializer
}

func NewHandler(
Expand All @@ -60,6 +61,7 @@ func NewHandler(
loaderConfig LoaderConfig,
saramaConfig kafka.SaramaConfig,
redshifter *redshift.Redshift,
redshiftGroup *string,
) *loaderHandler {
// apply defaults
if loaderConfig.MaxWaitSeconds == nil {
Expand All @@ -77,9 +79,10 @@ func NewHandler(
maxWaitSeconds: loaderConfig.MaxWaitSeconds,
maxBytesPerBatch: loaderConfig.MaxBytesPerBatch,

saramaConfig: saramaConfig,
redshifter: redshifter,
serializer: serializer.NewSerializer(viper.GetString("schemaRegistryURL")),
saramaConfig: saramaConfig,
redshifter: redshifter,
redshiftGroup: redshiftGroup,
serializer: serializer.NewSerializer(viper.GetString("schemaRegistryURL")),
}
}

Expand Down Expand Up @@ -123,6 +126,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
claim.Partition(),
h.saramaConfig,
h.redshifter,
h.redshiftGroup,
)
if err != nil {
return fmt.Errorf(
Expand Down

0 comments on commit d650741

Please sign in to comment.