Skip to content

Commit

Permalink
feat: create savepoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishan Arya committed Sep 25, 2024
1 parent cc07914 commit 9cbd081
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 32 deletions.
38 changes: 19 additions & 19 deletions modules/dagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,25 @@ type Resources struct {
}

type Config struct {
Resources Resources `json:"resources,omitempty"`
Source []Source `json:"source,omitempty"`
Sink Sink `json:"sink,omitempty"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
Replicas int `json:"replicas"`
SinkType string `json:"sink_type"`
Team string `json:"team"`
FlinkName string `json:"flink_name,omitempty"`
DeploymentID string `json:"deployment_id,omitempty"`
Savepoint any `json:"savepoint,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
Deleted bool `json:"deleted,omitempty"`
Namespace string `json:"namespace,omitempty"`
PrometheusURL string `json:"prometheus_url,omitempty"`
JarURI string `json:"jar_uri,omitempty"`
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
Resources Resources `json:"resources,omitempty"`
Source []Source `json:"source,omitempty"`
Sink Sink `json:"sink,omitempty"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
Replicas int `json:"replicas" default:"1"`
SinkType string `json:"sink_type"`
Team string `json:"team"`
FlinkName string `json:"flink_name,omitempty"`
DeploymentID string `json:"deployment_id,omitempty"`
Savepoint any `json:"savepoint,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
Deleted bool `json:"deleted,omitempty"`
Namespace string `json:"namespace,omitempty"`
PrometheusURL string `json:"prometheus_url,omitempty"`
JarURI string `json:"jar_uri,omitempty"`
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
SavepointTriggerNonce int `json:"savepoint_trigger_nonce,omitempty"`
}

type ChartValues struct {
Expand Down Expand Up @@ -209,7 +210,6 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
}
source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
}
}

Expand Down
18 changes: 10 additions & 8 deletions modules/dagger/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
)

const (
stepReleaseCreate = "release_create"
stepReleaseUpdate = "release_update"
stepReleaseStop = "release_stop"
stepKafkaReset = "kafka_reset"
stepReleaseCreate = "release_create"
stepReleaseUpdate = "release_update"
stepReleaseStop = "release_stop"
stepKafkaReset = "kafka_reset"
stepSavepointCreate = "savepoint_create"
)

const (
Expand Down Expand Up @@ -210,10 +211,11 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config,
"memory": conf.Resources.JobManager.Memory,
},
},
"jarURI": conf.JarURI,
"programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs),
"state": conf.JobState,
"namespace": conf.Namespace,
"jarURI": conf.JarURI,
"programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs),
"state": conf.JobState,
"namespace": conf.Namespace,
"savepointTriggerNonce": conf.SavepointTriggerNonce,
}

return rc, nil
Expand Down
33 changes: 33 additions & 0 deletions modules/dagger/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act
case ResetAction:
return dd.planReset(exr, act)

case TriggerSavepointAction:
return dd.planTriggerSavepoint(exr)

default:
return dd.planChange(exr, act)
}
Expand All @@ -52,6 +55,7 @@ func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.Actio
conf.JarURI = dd.conf.JarURI
conf.State = StateDeployed
conf.JobState = JobStateRunning
conf.SavepointTriggerNonce = 1

exr.Resource.Spec.Configs = modules.MustJSON(conf)

Expand Down Expand Up @@ -168,6 +172,35 @@ func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.Action
return &exr.Resource, nil
}

func (dd *daggerDriver) planTriggerSavepoint(exr module.ExpandedResource) (*resource.Resource, error) {
curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf)
if err != nil {
return nil, err
}

curConf.SavepointTriggerNonce += 1

immediately := dd.timeNow()

exr.Resource.Spec.Configs = modules.MustJSON(curConf)

err = dd.validateHelmReleaseConfigs(exr, *curConf)
if err != nil {
return nil, err
}

exr.Resource.State = resource.State{
Status: resource.StatusPending,
Output: exr.Resource.State.Output,
ModuleData: modules.MustJSON(transientData{
PendingSteps: []string{stepSavepointCreate},
}),
NextSyncAt: &immediately,
}

return &exr.Resource, nil
}

func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error {
var flinkOut flink.Output
if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/dagger/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (
modData.PendingSteps = modData.PendingSteps[1:]

switch pendingStep {
case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset:
case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset, stepSavepointCreate:
isCreate := pendingStep == stepReleaseCreate
if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil {
return nil, err
Expand Down
17 changes: 13 additions & 4 deletions modules/dagger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
)

const (
keyFlinkDependency = "flink"
StopAction = "stop"
StartAction = "start"
ResetAction = "reset"
keyFlinkDependency = "flink"
StopAction = "stop"
StartAction = "start"
ResetAction = "reset"
TriggerSavepointAction = "savepoint"
)

type FlinkCRDStatus struct {
Expand Down Expand Up @@ -55,6 +56,14 @@ var Module = module.Descriptor{
Name: ResetAction,
Description: "Resets the offset of a dagger",
},
{
Name: ResetAction,
Description: "Resets the offset of a dagger",
},
{
Name: TriggerSavepointAction,
Description: "Trigger a savepoint for a dagger",
},
},
DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
conf := defaultDriverConf // clone the default value
Expand Down

0 comments on commit 9cbd081

Please sign in to comment.