From 9cbd08127485af632d1ec9f286f4757ffd0eb8ea Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Wed, 25 Sep 2024 17:07:00 +0530 Subject: [PATCH] feat: create savepoint --- modules/dagger/config.go | 38 +++++++++++++++++------------------ modules/dagger/driver.go | 18 +++++++++-------- modules/dagger/driver_plan.go | 33 ++++++++++++++++++++++++++++++ modules/dagger/driver_sync.go | 2 +- modules/dagger/module.go | 17 ++++++++++++---- 5 files changed, 76 insertions(+), 32 deletions(-) diff --git a/modules/dagger/config.go b/modules/dagger/config.go index 69eeddf..86881b6 100644 --- a/modules/dagger/config.go +++ b/modules/dagger/config.go @@ -96,24 +96,25 @@ type Resources struct { } type Config struct { - Resources Resources `json:"resources,omitempty"` - Source []Source `json:"source,omitempty"` - Sink Sink `json:"sink,omitempty"` - EnvVariables map[string]string `json:"env_variables,omitempty"` - Replicas int `json:"replicas"` - SinkType string `json:"sink_type"` - Team string `json:"team"` - FlinkName string `json:"flink_name,omitempty"` - DeploymentID string `json:"deployment_id,omitempty"` - Savepoint any `json:"savepoint,omitempty"` - ChartValues *ChartValues `json:"chart_values,omitempty"` - Deleted bool `json:"deleted,omitempty"` - Namespace string `json:"namespace,omitempty"` - PrometheusURL string `json:"prometheus_url,omitempty"` - JarURI string `json:"jar_uri,omitempty"` - State string `json:"state"` - JobState string `json:"job_state"` - ResetOffset string `json:"reset_offset"` + Resources Resources `json:"resources,omitempty"` + Source []Source `json:"source,omitempty"` + Sink Sink `json:"sink,omitempty"` + EnvVariables map[string]string `json:"env_variables,omitempty"` + Replicas int `json:"replicas" default:"1"` + SinkType string `json:"sink_type"` + Team string `json:"team"` + FlinkName string `json:"flink_name,omitempty"` + DeploymentID string `json:"deployment_id,omitempty"` + Savepoint any `json:"savepoint,omitempty"` + ChartValues *ChartValues `json:"chart_values,omitempty"` + Deleted bool `json:"deleted,omitempty"` + Namespace string `json:"namespace,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + JarURI string `json:"jar_uri,omitempty"` + State string `json:"state"` + JobState string `json:"job_state"` + ResetOffset string `json:"reset_offset"` + SavepointTriggerNonce int `json:"savepoint_trigger_nonce,omitempty"` } type ChartValues struct { @@ -209,7 +210,6 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo } source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] - source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] } } diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 2efb06d..abc8f21 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -20,10 +20,11 @@ import ( ) const ( - stepReleaseCreate = "release_create" - stepReleaseUpdate = "release_update" - stepReleaseStop = "release_stop" - stepKafkaReset = "kafka_reset" + stepReleaseCreate = "release_create" + stepReleaseUpdate = "release_update" + stepReleaseStop = "release_stop" + stepKafkaReset = "kafka_reset" + stepSavepointCreate = "savepoint_create" ) const ( @@ -210,10 +211,11 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, "memory": conf.Resources.JobManager.Memory, }, }, - "jarURI": conf.JarURI, - "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs), - "state": conf.JobState, - "namespace": conf.Namespace, + "jarURI": conf.JarURI, + "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs), + "state": conf.JobState, + "namespace": conf.Namespace, + "savepointTriggerNonce": conf.SavepointTriggerNonce, } return rc, nil diff --git a/modules/dagger/driver_plan.go b/modules/dagger/driver_plan.go index 9fd5a96..ba7f190 100644 --- a/modules/dagger/driver_plan.go +++ b/modules/dagger/driver_plan.go @@ -28,6 +28,9 @@ func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act case ResetAction: return dd.planReset(exr, act) + case TriggerSavepointAction: + return dd.planTriggerSavepoint(exr) + default: return dd.planChange(exr, act) } @@ -52,6 +55,7 @@ func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.Actio conf.JarURI = dd.conf.JarURI conf.State = StateDeployed conf.JobState = JobStateRunning + conf.SavepointTriggerNonce = 1 exr.Resource.Spec.Configs = modules.MustJSON(conf) @@ -168,6 +172,35 @@ func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.Action return &exr.Resource, nil } +func (dd *daggerDriver) planTriggerSavepoint(exr module.ExpandedResource) (*resource.Resource, error) { + curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf) + if err != nil { + return nil, err + } + + curConf.SavepointTriggerNonce += 1 + + immediately := dd.timeNow() + + exr.Resource.Spec.Configs = modules.MustJSON(curConf) + + err = dd.validateHelmReleaseConfigs(exr, *curConf) + if err != nil { + return nil, err + } + + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + ModuleData: modules.MustJSON(transientData{ + PendingSteps: []string{stepSavepointCreate}, + }), + NextSyncAt: &immediately, + } + + return &exr.Resource, nil +} + func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error { var flinkOut flink.Output if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { diff --git a/modules/dagger/driver_sync.go b/modules/dagger/driver_sync.go index f5ebdcd..f317c82 100644 --- a/modules/dagger/driver_sync.go +++ b/modules/dagger/driver_sync.go @@ -43,7 +43,7 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) ( modData.PendingSteps = modData.PendingSteps[1:] switch pendingStep { - case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset: + case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset, stepSavepointCreate: isCreate := pendingStep == stepReleaseCreate if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil { return nil, err diff --git a/modules/dagger/module.go b/modules/dagger/module.go index 47da1f8..1404be2 100644 --- a/modules/dagger/module.go +++ b/modules/dagger/module.go @@ -17,10 +17,11 @@ import ( ) const ( - keyFlinkDependency = "flink" - StopAction = "stop" - StartAction = "start" - ResetAction = "reset" + keyFlinkDependency = "flink" + StopAction = "stop" + StartAction = "start" + ResetAction = "reset" + TriggerSavepointAction = "savepoint" ) type FlinkCRDStatus struct { @@ -55,6 +56,14 @@ var Module = module.Descriptor{ Name: ResetAction, Description: "Resets the offset of a dagger", }, + { + Name: ResetAction, + Description: "Resets the offset of a dagger", + }, + { + Name: TriggerSavepointAction, + Description: "Trigger a savepoint for a dagger", + }, }, DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { conf := defaultDriverConf // clone the default value