Skip to content

Commit

Permalink
Merge pull request #172 from practo/api-sinkgroups-maxBytes
Browse files Browse the repository at this point in the history
SinkGroupSpec, DeploymentUnit, MaxBytesPerBatch
  • Loading branch information
alok87 authored Apr 1, 2021
2 parents ec29410 + febba1c commit 5952d7a
Show file tree
Hide file tree
Showing 28 changed files with 3,413 additions and 669 deletions.
50 changes: 27 additions & 23 deletions redshiftsink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,35 @@ spec:
maxLoaderLag: 10
batcher:
suspend: false
maxSize: 10
maxWaitSeconds: 30
maxConcurrency: 10
mask: true
maskFile: "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/masker/database.yaml"
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi
sinkGroup:
all:
maxSizePerBatch: 10Mi
maxWaitSeconds: 30
maxConcurrency: 10
deploymentUnit:
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi
loader:
suspend: false
maxSize: 10
maxWaitSeconds: 30
maxProcessingTime: 60000
redshiftSchema: "inventory"
redshiftGroup: "sales"
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi
sinkGroup:
all:
maxSizePerBatch: 1Gi
maxWaitSeconds: 30
maxProcessingTime: 60000
deploymentUnit:
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi

```

```bash
Expand All @@ -82,11 +89,6 @@ This will start syncing all the Kakfa topics matching regex `"^db.inventory*"` f

### Configuration

### Redshiftsink Spec Documentation (TODO):
| Spec | Description | Mandatory |
| :------------ | :----------- |:------------|


## RedshiftSink Managed Pods
Redshiftsink performs the sink by creating two pods. Creating a RedshiftSink CRD installs the batcher and loader pods. Batcher and loader pods details are below:

Expand All @@ -113,7 +115,8 @@ Flags:

#### Metrics
```
rsk_batcher_messages_processed_per_second
rsk_batcher_bytes_processed
rsk_batcher_messages_processed
```

### Configuration
Expand Down Expand Up @@ -144,7 +147,8 @@ Flags:

#### Metrics
```
rsk_loader_messages_processed_per_second
rsk_loader_bytes_loaded
rsk_loader_messages_loaded
```

### Configuration
Expand Down
166 changes: 132 additions & 34 deletions redshiftsink/api/v1/redshiftsink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1

import (
corev1 "k8s.io/api/core/v1"
resource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -42,31 +43,109 @@ type RedshiftPodTemplateSpec struct {
Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"`
}

// RedshiftBatcherSpec defines the desired state of RedshiftBatcher
type RedshiftBatcherSpec struct {
// Supsend when turned on makes sure no batcher pods
// are running for this CRD object. Default: false
Suspend bool `json:"suspend,omitempty"`
// DeploymentUnit is used to specify how many topics will run together in a unit
// and how much resources it needs.
type DeploymentUnit struct {
// PodTemplate describes the pod specification for the unit.
// +optional
PodTemplate *RedshiftPodTemplateSpec `json:"podTemplate,omitempty"`
}

// Max configurations for the batcher to batch
MaxSize int `json:"maxSize"`
MaxWaitSeconds int `json:"maxWaitSeconds"`
// SinkGroupSpec defines the specification for one of the three sinkgroups:
// 1. MainSinkGroup 2. ReloadSinkGroup 3. ReloadDupeSinkGroup
type SinkGroupSpec struct {
// MaxSizePerBatch is the maximum size of the batch in bytes, Ki, Mi, Gi
// Example values: 1000, 1Ki, 100Mi, 1Gi
// 1000 is 1000 bytes, 1Ki is 1 Killo byte,
// 100Mi is 100 mega bytes, 1Gi is 1 Giga bytes
// +optional
MaxSizePerBatch *resource.Quantity `json:"maxSizePerBatch,omitempty"`
// MaxWaitSeconds is the maximum time to wait before making a batch,
// make a batch if MaxSizePerBatch is not hit during MaxWaitSeconds.
// +optional
MaxWaitSeconds *int `json:"maxWaitSeconds,omitempty"`
// MaxConcurrency is the maximum no, of batch processors to run concurrently.
// This spec is useful when the sink group pod operates in asynchronous mode.
// Loader pods does not needed this as they are synchronous.
// +optional
MaxConcurrency *int `json:"maxConcurrency,omitempty"`

// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 1000ms
// MaxProcessingTime is the max time in ms required to consume one message.
// Defaults for the batcher is 180000ms and loader is 600000ms.
// +optional
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`

// Mask when turned on enables masking of the data
// Default: false
// MaxReloadingUnits is the maximum number of units(pods) that can be launched
// based on the DeploymentUnit specification. Only valid for Reloading SinkGroup.
// This value is at present supported to be configurable only for batcher
// +optional
MaxReloadingUnits *int32 `json:"maxReloadingUnits,omitempty"`
// DeploymentUnit(pod) is the unit of deployment for the batcher or the loader.
// Using this user can specify the amount of resources
// needed to run them as one unit. Operator calculates the total units
// based on the total number of topics and this unit spec. This majorly
// solves the scaling issues described in #167.
// +optional
DeploymentUnit *DeploymentUnit `json:"deploymentUnit,omitempty"`
}

// SinkGroup is the group of batcher and loader pods based on the
// mask version, target table and the topic release status. This is the specification
// to allow to have different set of SinkGroupSpec for each type of SinkGroups.
// Explaining the precedence:
// The configuration required for full sink and the realtime sink can be different.
// SinkGroupSpec for each of the type of sink groups helps us provide different
// configurations for each of them. Following are the precedence:
// a) If All is specified and none of the others are specified, All is used for all SinkGroups.
// b) If All and Main both are specified then Main gets used for MainSinkGroup
// c) If All and Reload are specified then Reload gets used for ReloadSinkGroup
// d) If All and ReloadDupe are specified then ReloadDupe gets used for ReloadDupeSinkGroup
// d) If None gets specified then Defaults are used for all of them..
type SinkGroup struct {
// All specifies a common specification for all SinkGroups
// +optional
All *SinkGroupSpec `json:"all,omitempty"`
// Main specifies the MainSinkGroup specification, overwrites All
// +optional
Main *SinkGroupSpec `json:"main,omitempty"`
// Reload specifies the ReloadSinkGroup specification, overwrites All
// +optional
Reload *SinkGroupSpec `json:"reload,omitempty"`
// ReloadDupe specifies the ReloadDupeSinkGroup specification, overwrites All
// +optional
ReloadDupe *SinkGroupSpec `json:"reloadDupe,omitempty"`
}

// RedshiftBatcherSpec defines the desired state of RedshiftBatcher
type RedshiftBatcherSpec struct {
// Supsend is used to suspend batcher pods. Defaults to false.
Suspend bool `json:"suspend,omitempty"`

// Mask when turned on enables masking of the data. Defaults to false
// +optional
Mask bool `json:"mask"`
// MaskFile to use to apply mask configurations
// +optional
MaskFile string `json:"maskFile"`
MaskFile string `json:"maskFile,omitempty"`
// +optional

// SinkGroup contains the specification for main, reload and reloadDupe
// sinkgroups. Operator uses 3 groups to perform Redshiftsink. The topics
// which have never been released is part of Reload SinkGroup, the topics
// which gets released moves to the Main SinkGroup. ReloadDupe SinkGroup
// is used to give realtime upaates to the topics which are reloading.
// Defaults are there for all sinkGroups if none is specifed.
// +optional
SinkGroup *SinkGroup `json:"sinkGroup,omitempty"`

// Template describes the pods that will be created.
// Deprecated all of the below spec in favour of SinkGroup #167
MaxSize int `json:"maxSize,omitempty"`
MaxWaitSeconds int `json:"maxWaitSeconds,omitempty"`
MaxConcurrency *int `json:"maxConcurrency,omitempty"`
// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 1000ms
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`
// PodTemplate describes the pods that will be created.
// if this is not specifed, a default pod template is created
// +optional
PodTemplate *RedshiftPodTemplateSpec `json:"podTemplate,omitempty"`
Expand All @@ -78,25 +157,38 @@ type RedshiftLoaderSpec struct {
// are running for this CRD object. Default: false
Suspend bool `json:"suspend,omitempty"`

// Max configurations for the loader to batch the load
MaxSize int `json:"maxSize"`
MaxWaitSeconds int `json:"maxWaitSeconds"`

// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 600000ms (10mins)
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`
// SinkGroup contains the specification for main, reload and reloadDupe
// sinkgroups. Operator uses 3 groups to perform Redshiftsink. The topics
// which have never been released is part of Reload SinkGroup, the topics
// which gets released moves to the Main SinkGroup. ReloadDupe SinkGroup
// is used to give realtime upaates to the topics which are reloading.
// Defaults are there for all sinkGroups if none is specifed.
// +optional
SinkGroup *SinkGroup `json:"sinkGroup,omitempty"`

// RedshiftSchema to sink the data in
RedshiftSchema string `json:"redshiftSchema"`
// RedshiftMaxOpenConns is the maximum open connections allowed
// +optional
RedshiftMaxOpenConns *int `json:"redshiftMaxOpenConns,omitempty"`
// RedshiftMaxIdleConns is the maximum idle connections allowed
// +optional
RedshiftMaxIdleConns *int `json:"redshiftMaxIdleConns,omitempty"`
// RedshiftGroup to give the access to when new topics gets released
RedshiftGroup *string `json:"redshiftGroup"`

// Template describes the pods that will be created.
// Deprecated all of the below spec in favour of SinkGroup #167
// Max configurations for the loader to batch the load
// +optional
MaxSize int `json:"maxSize,omitempty"`
// +optional
MaxWaitSeconds int `json:"maxWaitSeconds,omitempty"`
// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 600000ms (10mins)
// +optional
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`
// PodTemplate describes the pods that will be created.
// if this is not specifed, a default pod template is created
// +optional
PodTemplate *RedshiftPodTemplateSpec `json:"podTemplate,omitempty"`
Expand All @@ -122,7 +214,7 @@ type RedshiftSinkSpec struct {
KafkaVersion string `json:"kafkaVersion"`
KafkaTopicRegexes string `json:"kafkaTopicRegexes"`
// +optional
KafkaLoaderTopicPrefix string `json:"kafkaLoaderTopicPrefix"`
KafkaLoaderTopicPrefix string `json:"kafkaLoaderTopicPrefix,omitempty"`

Batcher RedshiftBatcherSpec `json:"batcher"`
Loader RedshiftLoaderSpec `json:"loader"`
Expand All @@ -132,25 +224,25 @@ type RedshiftSinkSpec struct {
// This is relevant only if masking is turned on in mask configuration.
// It is used for live mask reloading.
// +optional
ReleaseCondition *ReleaseCondition `json:"releaseCondition"`
ReleaseCondition *ReleaseCondition `json:"releaseCondition,omitempty"`

// TopicReleaseCondition is considered instead of ReleaseCondition
// if it is defined for a topic. This is used for topics which
// does not work well with central ReleaseCondition for all topics
// +optional
TopicReleaseCondition map[string]ReleaseCondition `json:"topicReleaseCondition"`
TopicReleaseCondition map[string]ReleaseCondition `json:"topicReleaseCondition,omitempty"`
}

type ReleaseCondition struct {
// MaxBatcherLag is the maximum lag the batcher consumer group
// shoud have to be be considered to be operating in realtime and
// to be considered for release.
MaxBatcherLag *int64 `json:"maxBatcherLag"`
MaxBatcherLag *int64 `json:"maxBatcherLag,omitempty"`

// MaxLoaderLag is the maximum lag the loader consumer group
// shoud have to be be considered to be operating in realtime and
// to be considered for release.
MaxLoaderLag *int64 `json:"maxLoaderLag"`
MaxLoaderLag *int64 `json:"maxLoaderLag,omitempty"`
}

// MaskPhase is a label for the condition of a masking at the current time.
Expand Down Expand Up @@ -208,7 +300,7 @@ type MaskStatus struct {

type Group struct {
// LoaderTopicPrefix stores the name of the loader topic prefix
LoaderTopicPrefix string `json:"loaderTopicPrefix"`
LoaderTopicPrefix string `json:"loaderTopicPrefix,omitempty"`

// LoaderCurrentOffset stores the last read current offset of the consumer group
// This is required to determine if the consumer group has performed any
Expand All @@ -218,7 +310,7 @@ type Group struct {
// throughput consumer groups not getting moved to realtime from reloading.
// TODO: This is not dead field once a group moves to released and
// should be cleaned after that(status needs to be updated)
LoaderCurrentOffset *int64 `json:"currentOffset"`
LoaderCurrentOffset *int64 `json:"currentOffset,omitempty"`

// ID stores the name of the consumer group for the topic
// based on this batcher and loader consumer groups are made
Expand All @@ -232,11 +324,17 @@ type RedshiftSinkStatus struct {

// MaskStatus stores the status of masking for topics if masking is enabled
// +optional
MaskStatus *MaskStatus `json:"maskStatus"`
MaskStatus *MaskStatus `json:"maskStatus,omitempty"`

// TopicGroup stores the group info for the topic
// +optional
TopicGroup map[string]Group `json:"topicGroups"`
TopicGroup map[string]Group `json:"topicGroups,omitempty"`

// BatcherReloadingTopics stores the list of topics which are currently reloading
// for the batcher deployments in the reload sink group.
// There is a limit to maximum topics that can be reloaded. (MaxReloadingUnits)
// +optional
BatcherReloadingTopics []string `json:"batcherReloadingTopics,omitempty"`
}

// +kubebuilder:resource:path=redshiftsinks,shortName=rsk;rsks
Expand Down
Loading

0 comments on commit 5952d7a

Please sign in to comment.