Skip to content

Commit

Permalink
Merge pull request #233 from practo/use-redshift-metrics
Browse files Browse the repository at this point in the history
Reduce unneeded loads
  • Loading branch information
alok87 authored May 25, 2021
2 parents 30f67ec + ac3be9d commit d9a0d10
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/redshiftloader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Config struct {
PrometheusURL string `yaml:"prometheusURL"`
Redshift redshift.RedshiftConfig `yaml:"redshift"`
RedshiftGroup *string `yaml:"redshiftGroup,omitempty"`
RedshiftMetrics bool `yaml:"redshiftMetrics"`
Rsk string `yaml:"rsk,omitempty"`
SinkGroup string `yaml:"sinkGroup,omitempty"`
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/redshiftloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func run(cmd *cobra.Command, args []string) {
config.Loader,
groupConfig.Sarama,
redshifter,
config.Redshift.Schema,
config.RedshiftGroup,
config.RedshiftMetrics,
),
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/redshiftsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func main() {
DefaultRedshiftMaxIdleConns: redshiftMaxIdleConns,
AllowedResources: allowedResources,
PrometheusClient: prometheusClient,
RedshiftMetrics: collectRedshiftMetrics,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RedshiftSink")
os.Exit(1)
Expand Down
8 changes: 5 additions & 3 deletions controllers/loader_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func NewLoader(
defaultMaxOpenConns int,
defaultMaxIdleConns int,
prometheusURL string,
redshiftMetrics bool,
) (
Deployment,
error,
Expand Down Expand Up @@ -313,9 +314,10 @@ func NewLoader(
MaxOpenConns: maxOpenConns,
MaxIdleConns: maxIdleConns,
},
RedshiftGroup: rsk.Spec.Loader.RedshiftGroup,
Rsk: rsk.Name,
SinkGroup: sinkGroup,
RedshiftGroup: rsk.Spec.Loader.RedshiftGroup,
RedshiftMetrics: redshiftMetrics,
Rsk: rsk.Name,
SinkGroup: sinkGroup,
}
confBytes, err := yaml.Marshal(conf)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type RedshiftSinkReconciler struct {
AllowedResources []string

PrometheusClient prometheus.Client
RedshiftMetrics bool
}

const (
Expand Down Expand Up @@ -353,7 +354,7 @@ func (r *RedshiftSinkReconciler) reconcile(
setTopics(kafkaTopics).
setMaskVersion("").
buildBatchers(secret, r.DefaultBatcherImage, r.DefaultKafkaVersion, tlsConfig).
buildLoaders(secret, r.DefaultLoaderImage, "", r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL).
buildLoaders(secret, r.DefaultLoaderImage, "", r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL, r.RedshiftMetrics).
build()
result, maskLessSinkGroupEvent, err := maskLessSinkGroup.reconcile(ctx)
if len(maskLessSinkGroupEvent) > 0 {
Expand Down Expand Up @@ -486,7 +487,7 @@ func (r *RedshiftSinkReconciler) reconcile(
setTopicGroups().
setRealtimeCalculator(calc).
buildBatchers(secret, r.DefaultBatcherImage, r.DefaultKafkaVersion, tlsConfig).
buildLoaders(secret, r.DefaultLoaderImage, ReloadTableSuffix, r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL).
buildLoaders(secret, r.DefaultLoaderImage, ReloadTableSuffix, r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL, r.RedshiftMetrics).
build()
status.updateBatcherReloadingTopics(reload.batcherDeploymentTopics(), calc.batchersRealtime)
status.updateLoaderReloadingTopics(reload.loaderDeploymentTopics(), calc.loadersRealtime)
Expand All @@ -499,7 +500,7 @@ func (r *RedshiftSinkReconciler) reconcile(
setTopicGroups().
setRealtimeCalculator(nil).
buildBatchers(secret, r.DefaultBatcherImage, r.DefaultKafkaVersion, tlsConfig).
buildLoaders(secret, r.DefaultLoaderImage, "", r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL).
buildLoaders(secret, r.DefaultLoaderImage, "", r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL, r.RedshiftMetrics).
build()

main = sgBuilder.
Expand All @@ -510,7 +511,7 @@ func (r *RedshiftSinkReconciler) reconcile(
setTopicGroups().
setRealtimeCalculator(nil).
buildBatchers(secret, r.DefaultBatcherImage, r.DefaultKafkaVersion, tlsConfig).
buildLoaders(secret, r.DefaultLoaderImage, "", r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL).
buildLoaders(secret, r.DefaultLoaderImage, "", r.DefaultKafkaVersion, tlsConfig, r.DefaultRedshiftMaxOpenConns, r.DefaultRedshiftMaxIdleConns, prometheusURL, r.RedshiftMetrics).
build()

sinkGroups := []*sinkGroup{reloadDupe, reload, main}
Expand Down
5 changes: 4 additions & 1 deletion controllers/sinkgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type sinkGroupBuilder interface {
setRealtimeCalculator(calc *realtimeCalculator) sinkGroupBuilder

buildBatchers(secret map[string]string, defaultImage, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig) sinkGroupBuilder
buildLoaders(secret map[string]string, defaultImage, tableSuffix string, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig, defaultMaxOpenConns int, defaultMaxIdleConns int, prometheusURL string) sinkGroupBuilder
buildLoaders(secret map[string]string, defaultImage, tableSuffix string, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig, defaultMaxOpenConns int, defaultMaxIdleConns int, prometheusURL string, redshiftMetrics bool) sinkGroupBuilder

build() *sinkGroup
}
Expand Down Expand Up @@ -240,6 +240,7 @@ func (sb *buildSinkGroup) buildLoaders(
defaultMaxOpenConns int,
defaultMaxIdleConns int,
prometheusURL string,
redshiftMetrics bool,
) sinkGroupBuilder {
loaders := []Deployment{}
if sb.rsk.Spec.Loader.SinkGroup != nil {
Expand Down Expand Up @@ -303,6 +304,7 @@ func (sb *buildSinkGroup) buildLoaders(
defaultMaxOpenConns,
defaultMaxIdleConns,
prometheusURL,
redshiftMetrics,
)
if err != nil {
klog.Fatalf("Error making loader: %v", err)
Expand All @@ -329,6 +331,7 @@ func (sb *buildSinkGroup) buildLoaders(
defaultMaxOpenConns,
defaultMaxIdleConns,
prometheusURL,
redshiftMetrics,
)
if err != nil {
klog.Fatalf("Error making loader: %v", err)
Expand Down
90 changes: 75 additions & 15 deletions pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ type loaderHandler struct {
maxWaitSeconds *int
maxBytesPerBatch *int64

saramaConfig kafka.SaramaConfig
redshifter *redshift.Redshift
redshiftGroup *string
serializer serializer.Serializer
saramaConfig kafka.SaramaConfig
serializer serializer.Serializer

redshifter *redshift.Redshift
redshiftSchema string
redshiftGroup *string
redshiftMetrics bool

// prometheusClient is used to query the running loaders
// so that concurrency of load can be maintained below a threshold
Expand All @@ -75,7 +78,9 @@ func NewHandler(
loaderConfig LoaderConfig,
saramaConfig kafka.SaramaConfig,
redshifter *redshift.Redshift,
redshiftSchema string,
redshiftGroup *string,
redshiftMetrics bool,
) *loaderHandler {
// apply defaults
if loaderConfig.MaxWaitSeconds == nil {
Expand Down Expand Up @@ -103,10 +108,14 @@ func NewHandler(
maxWaitSeconds: loaderConfig.MaxWaitSeconds,
maxBytesPerBatch: loaderConfig.MaxBytesPerBatch,

saramaConfig: saramaConfig,
redshifter: redshifter,
redshiftGroup: redshiftGroup,
serializer: serializer.NewSerializer(viper.GetString("schemaRegistryURL")),
saramaConfig: saramaConfig,
serializer: serializer.NewSerializer(viper.GetString("schemaRegistryURL")),

redshifter: redshifter,
redshiftSchema: redshiftSchema,
redshiftGroup: redshiftGroup,
redshiftMetrics: redshiftMetrics,

prometheusClient: prometheusClient,
loadRunning: new(sync.Map),
}
Expand All @@ -133,11 +142,53 @@ func (h *loaderHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

type throttleBudget struct {
max int
interval int // seconds
}

func (h *loaderHandler) throttleBudget(topic string, firstLoad bool) (throttleBudget, error) {
// When Redshift Metric is disabled, all topics are throtted in the
// same way, regardless of their usage in Redshift.
// i.e. they get the same throttling budget.
if !h.redshiftMetrics {
klog.V(2).Infof("%s: redshiftMetrics disabled, recommended to enable", topic)
if firstLoad {
return throttleBudget{max: 120, interval: 15}, nil // 30mins max
} else {
return throttleBudget{max: 10, interval: 15}, nil // 2.5 mins max
}
}

// When Redshift Metric is enabled,
// throttling budget is based on the usage of tables in redshift.
queries, err := h.prometheusClient.Query(
fmt.Sprintf(
"increase(redshift_scan_query_total{schema='%s', tablename='%s'}[1d])",
h.redshiftSchema,
topic,
),
)
if err != nil {
return throttleBudget{}, err
}

if queries > 0 && firstLoad {
return throttleBudget{max: 120, interval: 15}, nil // 30mins max
} else if queries > 0 {
return throttleBudget{max: 3, interval: 10}, nil // 30 seconds max, just to spread out the load
} else if queries == 0 && firstLoad { // case of tables which have not been queried in last 1d and this the first run
return throttleBudget{max: 8, interval: 900}, nil // 2hrs max
} else { // case of tables which have not been queried in last 1d and this not the first run
return throttleBudget{max: 4, interval: 900}, nil // 1hr max
}
}

func (h *loaderHandler) throttle(topic string, metric metricSetter) error {
// never throttle if promtheus client is not set
// this makes throttling using prometheus an addon feature
if h.prometheusClient == nil {
klog.V(2).Infof("%s: promtheus is not enabled, throttling feature disabled", topic)
klog.V(2).Infof("%s: prometheus disabled, throttle disabled", topic)
return nil
}

Expand All @@ -151,13 +202,22 @@ func (h *loaderHandler) throttle(topic string, metric metricSetter) error {
})
klog.V(4).Infof("%s: running loaders(local): %v", topic, localLoadRunning)

throttleBudget := FirstThrottlingBudget
firstLoad := true
_, ok := h.loadRunning.Load(topic)
if ok {
throttleBudget = ThrottlingBudget
firstLoad = false
}

for i := 0; i < throttleBudget; i++ {
var budget throttleBudget
for cnt := 0; ; cnt++ {
budget, err := h.throttleBudget(topic, firstLoad)
if err != nil {
return err
}
if cnt >= budget.max {
break
}

runningLoaders, err := h.prometheusClient.Query("sum(rsk_loader_running > 0)")
if err != nil {
return err
Expand All @@ -168,12 +228,12 @@ func (h *loaderHandler) throttle(topic string, metric metricSetter) error {
return nil
}

klog.V(2).Infof("%s: throttled for 15s", topic)
klog.V(2).Infof("%s: throttled for %+v seconds", topic, budget.interval)
metric.incThrottled()
time.Sleep(15 * time.Second)
time.Sleep(time.Duration(budget.interval) * time.Second)
}

klog.V(2).Infof("%s: throttle budget exhausted, go load!", topic)
klog.V(2).Infof("%s: exhausted throttle budget: %+v, go load!", topic, budget)

return nil
}
Expand Down

0 comments on commit d9a0d10

Please sign in to comment.