diff --git a/modules/dagger/config.go b/modules/dagger/config.go index d389cda..69eeddf 100644 --- a/modules/dagger/config.go +++ b/modules/dagger/config.go @@ -111,6 +111,9 @@ type Config struct { Namespace string `json:"namespace,omitempty"` PrometheusURL string `json:"prometheus_url,omitempty"` JarURI string `json:"jar_uri,omitempty"` + State string `json:"state"` + JobState string `json:"job_state"` + ResetOffset string `json:"reset_offset"` } type ChartValues struct { @@ -198,19 +201,16 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo //transformation #1 source := cfg.Source - for i := range source { - if source[i].SourceParquet.SourceParquetFilePaths != nil && len(source[i].SourceParquet.SourceParquetFilePaths) > 0 { - //source is parquete - //do nothing - continue + if !(source[0].SourceParquet.SourceParquetFilePaths != nil && len(source[0].SourceParquet.SourceParquetFilePaths) > 0) { + for i := range source { + //TODO: check how to handle increment group id on update + if source[i].SourceKafkaConsumerConfigGroupID == "" { + source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) + } + source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] + source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] + source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] } - //TODO: check how to handle increment group id on update - if source[i].SourceKafkaConsumerConfigGroupID == "" { - source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) - } - source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] - source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] - source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] } cfg.Source = source diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 1621dba..2efb06d 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -22,6 +22,8 @@ import ( const ( stepReleaseCreate = "release_create" stepReleaseUpdate = "release_update" + stepReleaseStop = "release_stop" + stepKafkaReset = "kafka_reset" ) const ( @@ -54,17 +56,19 @@ var defaultDriverConf = driverConf{ } type daggerDriver struct { - timeNow func() time.Time - conf driverConf - kubeDeploy kubeDeployFn - kubeGetPod kubeGetPodFn - kubeGetCRD kubeGetCRDFn + timeNow func() time.Time + conf driverConf + kubeDeploy kubeDeployFn + kubeGetPod kubeGetPodFn + kubeGetCRD kubeGetCRDFn + consumerReset consumerResetFn } type ( - kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error - kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) - kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) + kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error + kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) + kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) + consumerResetFn func(ctx context.Context, conf Config, resetTo string) []Source ) type driverConf struct { @@ -88,7 +92,6 @@ type driverConf struct { } type Output struct { - State string `json:"state,omitempty"` JMDeployStatus string `json:"jm_deploy_status,omitempty"` JobStatus string `json:"job_status,omitempty"` Reconcilation string `json:"reconcilation,omitempty"` @@ -98,7 +101,8 @@ type Output struct { } type transientData struct { - PendingSteps []string `json:"pending_steps"` + PendingSteps []string `json:"pending_steps"` + ResetOffsetTo string `json:"reset_offset_to,omitempty"` } func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) { @@ -208,7 +212,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, }, "jarURI": conf.JarURI, "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs), - "state": "running", + "state": conf.JobState, "namespace": conf.Namespace, } diff --git a/modules/dagger/driver_plan.go b/modules/dagger/driver_plan.go index f18d245..9fd5a96 100644 --- a/modules/dagger/driver_plan.go +++ b/modules/dagger/driver_plan.go @@ -9,15 +9,25 @@ import ( "github.com/goto/entropy/modules" "github.com/goto/entropy/modules/flink" "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/kafka" ) const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" +const ( + JobStateRunning = "running" + JobStateSuspended = "suspended" + StateDeployed = "DEPLOYED" + StateUserStopped = "USER_STOPPED" +) func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { switch act.Name { case module.CreateAction: return dd.planCreate(exr, act) + case ResetAction: + return dd.planReset(exr, act) + default: return dd.planChange(exr, act) } @@ -40,6 +50,8 @@ func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.Actio immediately := dd.timeNow() conf.JarURI = dd.conf.JarURI + conf.State = StateDeployed + conf.JobState = JobStateRunning exr.Resource.Spec.Configs = modules.MustJSON(conf) @@ -86,12 +98,22 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio newConf.DeploymentID = curConf.DeploymentID newConf.ChartValues = chartVals newConf.JarURI = curConf.JarURI + newConf.State = StateDeployed + newConf.JobState = JobStateRunning newConf.Resources = mergeResources(curConf.Resources, newConf.Resources) curConf = newConf - } + case StopAction: + curConf.State = StateUserStopped + curConf.JobState = JobStateSuspended + + case StartAction: + curConf.State = StateDeployed + curConf.JobState = JobStateRunning + + } immediately := dd.timeNow() exr.Resource.Spec.Configs = modules.MustJSON(curConf) @@ -113,6 +135,39 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio return &exr.Resource, nil } +func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + resetValue, err := kafka.ParseResetV2Params(act.Params) + if err != nil { + return nil, err + } + + immediately := dd.timeNow() + + curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf) + if err != nil { + return nil, err + } + + curConf.ResetOffset = resetValue + + curConf.Source = dd.consumerReset(context.Background(), *curConf, resetValue) + curConf.EnvVariables[keyStreams] = string(mustMarshalJSON(curConf.Source)) + + exr.Resource.Spec.Configs = modules.MustJSON(curConf) + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + NextSyncAt: &immediately, + ModuleData: modules.MustJSON(transientData{ + ResetOffsetTo: resetValue, + PendingSteps: []string{ + stepKafkaReset, + }, + }), + } + return &exr.Resource, nil +} + func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error { var flinkOut flink.Output if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { diff --git a/modules/dagger/driver_sync.go b/modules/dagger/driver_sync.go index 34885d1..f5ebdcd 100644 --- a/modules/dagger/driver_sync.go +++ b/modules/dagger/driver_sync.go @@ -43,7 +43,7 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) ( modData.PendingSteps = modData.PendingSteps[1:] switch pendingStep { - case stepReleaseCreate, stepReleaseUpdate: + case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset: isCreate := pendingStep == stepReleaseCreate if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil { return nil, err diff --git a/modules/dagger/module.go b/modules/dagger/module.go index bfc4149..47da1f8 100644 --- a/modules/dagger/module.go +++ b/modules/dagger/module.go @@ -18,6 +18,9 @@ import ( const ( keyFlinkDependency = "flink" + StopAction = "stop" + StartAction = "start" + ResetAction = "reset" ) type FlinkCRDStatus struct { @@ -40,6 +43,18 @@ var Module = module.Descriptor{ Name: module.UpdateAction, Description: "Updates an existing dagger", }, + { + Name: StopAction, + Description: "Suspends a running dagger", + }, + { + Name: StartAction, + Description: "Starts a suspended dagger", + }, + { + Name: ResetAction, + Description: "Resets the offset of a dagger", + }, }, DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { conf := defaultDriverConf // clone the default value @@ -93,7 +108,9 @@ var Module = module.Descriptor{ return kube.FlinkDeploymentStatus{}, err } return parseFlinkCRDStatus(crd.Object) - }}, nil + }, + consumerReset: consumerReset, + }, nil }, } @@ -125,3 +142,15 @@ func parseFlinkCRDStatus(flinkDeployment map[string]interface{}) (kube.FlinkDepl } return status, nil } + +func consumerReset(ctx context.Context, conf Config, resetTo string) []Source { + baseGroup := conf.Source[0].SourceKafkaConsumerConfigGroupID + groupId := incrementGroupId(baseGroup, len(conf.Source)) + + for i := range conf.Source { + conf.Source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(groupId, i) + conf.Source[i].SourceKafkaConsumerConfigAutoOffsetReset = resetTo + } + + return conf.Source +}