Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: savepoint #113

Draft
wants to merge 19 commits into
base: flink-module
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: create savepoint
  • Loading branch information
Ishan Arya committed Sep 25, 2024
commit 9cbd08127485af632d1ec9f286f4757ffd0eb8ea
38 changes: 19 additions & 19 deletions modules/dagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,25 @@ type Resources struct {
}

type Config struct {
Resources Resources `json:"resources,omitempty"`
Source []Source `json:"source,omitempty"`
Sink Sink `json:"sink,omitempty"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
Replicas int `json:"replicas"`
SinkType string `json:"sink_type"`
Team string `json:"team"`
FlinkName string `json:"flink_name,omitempty"`
DeploymentID string `json:"deployment_id,omitempty"`
Savepoint any `json:"savepoint,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
Deleted bool `json:"deleted,omitempty"`
Namespace string `json:"namespace,omitempty"`
PrometheusURL string `json:"prometheus_url,omitempty"`
JarURI string `json:"jar_uri,omitempty"`
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
Resources Resources `json:"resources,omitempty"`
Source []Source `json:"source,omitempty"`
Sink Sink `json:"sink,omitempty"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
Replicas int `json:"replicas" default:"1"`
SinkType string `json:"sink_type"`
Team string `json:"team"`
FlinkName string `json:"flink_name,omitempty"`
DeploymentID string `json:"deployment_id,omitempty"`
Savepoint any `json:"savepoint,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
Deleted bool `json:"deleted,omitempty"`
Namespace string `json:"namespace,omitempty"`
PrometheusURL string `json:"prometheus_url,omitempty"`
JarURI string `json:"jar_uri,omitempty"`
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
SavepointTriggerNonce int `json:"savepoint_trigger_nonce,omitempty"`
}

type ChartValues struct {
Expand Down Expand Up @@ -209,7 +210,6 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
}
source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
}
}

Expand Down
18 changes: 10 additions & 8 deletions modules/dagger/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
)

const (
stepReleaseCreate = "release_create"
stepReleaseUpdate = "release_update"
stepReleaseStop = "release_stop"
stepKafkaReset = "kafka_reset"
stepReleaseCreate = "release_create"
stepReleaseUpdate = "release_update"
stepReleaseStop = "release_stop"
stepKafkaReset = "kafka_reset"
stepSavepointCreate = "savepoint_create"
)

const (
Expand Down Expand Up @@ -210,10 +211,11 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config,
"memory": conf.Resources.JobManager.Memory,
},
},
"jarURI": conf.JarURI,
"programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs),
"state": conf.JobState,
"namespace": conf.Namespace,
"jarURI": conf.JarURI,
"programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs),
"state": conf.JobState,
"namespace": conf.Namespace,
"savepointTriggerNonce": conf.SavepointTriggerNonce,
}

return rc, nil
Expand Down
33 changes: 33 additions & 0 deletions modules/dagger/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act
case ResetAction:
return dd.planReset(exr, act)

case TriggerSavepointAction:
return dd.planTriggerSavepoint(exr)

default:
return dd.planChange(exr, act)
}
Expand All @@ -52,6 +55,7 @@ func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.Actio
conf.JarURI = dd.conf.JarURI
conf.State = StateDeployed
conf.JobState = JobStateRunning
conf.SavepointTriggerNonce = 1

exr.Resource.Spec.Configs = modules.MustJSON(conf)

Expand Down Expand Up @@ -168,6 +172,35 @@ func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.Action
return &exr.Resource, nil
}

func (dd *daggerDriver) planTriggerSavepoint(exr module.ExpandedResource) (*resource.Resource, error) {
curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf)
if err != nil {
return nil, err
}

curConf.SavepointTriggerNonce += 1

immediately := dd.timeNow()

exr.Resource.Spec.Configs = modules.MustJSON(curConf)

err = dd.validateHelmReleaseConfigs(exr, *curConf)
if err != nil {
return nil, err
}

exr.Resource.State = resource.State{
Status: resource.StatusPending,
Output: exr.Resource.State.Output,
ModuleData: modules.MustJSON(transientData{
PendingSteps: []string{stepSavepointCreate},
}),
NextSyncAt: &immediately,
}

return &exr.Resource, nil
}

func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error {
var flinkOut flink.Output
if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/dagger/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (
modData.PendingSteps = modData.PendingSteps[1:]

switch pendingStep {
case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset:
case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset, stepSavepointCreate:
isCreate := pendingStep == stepReleaseCreate
if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil {
return nil, err
Expand Down
17 changes: 13 additions & 4 deletions modules/dagger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
)

const (
keyFlinkDependency = "flink"
StopAction = "stop"
StartAction = "start"
ResetAction = "reset"
keyFlinkDependency = "flink"
StopAction = "stop"
StartAction = "start"
ResetAction = "reset"
TriggerSavepointAction = "savepoint"
)

type FlinkCRDStatus struct {
Expand Down Expand Up @@ -55,6 +56,14 @@ var Module = module.Descriptor{
Name: ResetAction,
Description: "Resets the offset of a dagger",
},
{
Name: ResetAction,
Description: "Resets the offset of a dagger",
},
{
Name: TriggerSavepointAction,
Description: "Trigger a savepoint for a dagger",
},
},
DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
conf := defaultDriverConf // clone the default value
Expand Down
Loading