Skip to content

Commit

Permalink
feat: dagger actions (#114)
Browse files Browse the repository at this point in the history
* feat: add dagger module

* feat: add flink dep

* feat: add transformations

* fix: read stream from config root

* feat: add Plan implementation

* fix: chart values

* fix: resolve TODOs and refactored

* fix: source sink base handling

* feat: Output to have CR details

* feat: handle status

* refactor: seperate contants by type

* refactor: kubeGetCRD function

* feat: add dagger update action

* fix: add Update action

* chore: change var name to sink_kafka_stream

* feat: merge consumer group ID if sink is same

* feat: add start, stop and reset actions (#112)

* feat: add start & update action

* feat: add reset action

---------

Co-authored-by: Ishan Arya <[email protected]>

---------

Co-authored-by: Ishan Arya <[email protected]>
  • Loading branch information
ishanarya0 and Ishan Arya authored Sep 30, 2024
1 parent 0a925bf commit 2eb0f56
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 26 deletions.
24 changes: 12 additions & 12 deletions modules/dagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type Config struct {
Namespace string `json:"namespace,omitempty"`
PrometheusURL string `json:"prometheus_url,omitempty"`
JarURI string `json:"jar_uri,omitempty"`
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
}

type ChartValues struct {
Expand Down Expand Up @@ -198,19 +201,16 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo
//transformation #1
source := cfg.Source

for i := range source {
if source[i].SourceParquet.SourceParquetFilePaths != nil && len(source[i].SourceParquet.SourceParquetFilePaths) > 0 {
//source is parquete
//do nothing
continue
if !(source[0].SourceParquet.SourceParquetFilePaths != nil && len(source[0].SourceParquet.SourceParquetFilePaths) > 0) {
for i := range source {
//TODO: check how to handle increment group id on update
if source[i].SourceKafkaConsumerConfigGroupID == "" {
source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i)
}
source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
}
//TODO: check how to handle increment group id on update
if source[i].SourceKafkaConsumerConfigGroupID == "" {
source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i)
}
source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
}

cfg.Source = source
Expand Down
26 changes: 15 additions & 11 deletions modules/dagger/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
const (
stepReleaseCreate = "release_create"
stepReleaseUpdate = "release_update"
stepReleaseStop = "release_stop"
stepKafkaReset = "kafka_reset"
)

const (
Expand Down Expand Up @@ -54,17 +56,19 @@ var defaultDriverConf = driverConf{
}

type daggerDriver struct {
timeNow func() time.Time
conf driverConf
kubeDeploy kubeDeployFn
kubeGetPod kubeGetPodFn
kubeGetCRD kubeGetCRDFn
timeNow func() time.Time
conf driverConf
kubeDeploy kubeDeployFn
kubeGetPod kubeGetPodFn
kubeGetCRD kubeGetCRDFn
consumerReset consumerResetFn
}

type (
kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error
kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error)
kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error)
kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error
kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error)
kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error)
consumerResetFn func(ctx context.Context, conf Config, resetTo string) []Source
)

type driverConf struct {
Expand All @@ -88,7 +92,6 @@ type driverConf struct {
}

type Output struct {
State string `json:"state,omitempty"`
JMDeployStatus string `json:"jm_deploy_status,omitempty"`
JobStatus string `json:"job_status,omitempty"`
Reconcilation string `json:"reconcilation,omitempty"`
Expand All @@ -98,7 +101,8 @@ type Output struct {
}

type transientData struct {
PendingSteps []string `json:"pending_steps"`
PendingSteps []string `json:"pending_steps"`
ResetOffsetTo string `json:"reset_offset_to,omitempty"`
}

func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) {
Expand Down Expand Up @@ -208,7 +212,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config,
},
"jarURI": conf.JarURI,
"programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs),
"state": "running",
"state": conf.JobState,
"namespace": conf.Namespace,
}

Expand Down
57 changes: 56 additions & 1 deletion modules/dagger/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@ import (
"github.com/goto/entropy/modules"
"github.com/goto/entropy/modules/flink"
"github.com/goto/entropy/pkg/errors"
"github.com/goto/entropy/pkg/kafka"
)

const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
const (
JobStateRunning = "running"
JobStateSuspended = "suspended"
StateDeployed = "DEPLOYED"
StateUserStopped = "USER_STOPPED"
)

func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
switch act.Name {
case module.CreateAction:
return dd.planCreate(exr, act)

case ResetAction:
return dd.planReset(exr, act)

default:
return dd.planChange(exr, act)
}
Expand All @@ -40,6 +50,8 @@ func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.Actio

immediately := dd.timeNow()
conf.JarURI = dd.conf.JarURI
conf.State = StateDeployed
conf.JobState = JobStateRunning

exr.Resource.Spec.Configs = modules.MustJSON(conf)

Expand Down Expand Up @@ -86,12 +98,22 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio
newConf.DeploymentID = curConf.DeploymentID
newConf.ChartValues = chartVals
newConf.JarURI = curConf.JarURI
newConf.State = StateDeployed
newConf.JobState = JobStateRunning

newConf.Resources = mergeResources(curConf.Resources, newConf.Resources)

curConf = newConf
}

case StopAction:
curConf.State = StateUserStopped
curConf.JobState = JobStateSuspended

case StartAction:
curConf.State = StateDeployed
curConf.JobState = JobStateRunning

}
immediately := dd.timeNow()

exr.Resource.Spec.Configs = modules.MustJSON(curConf)
Expand All @@ -113,6 +135,39 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio
return &exr.Resource, nil
}

func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetV2Params(act.Params)
if err != nil {
return nil, err
}

immediately := dd.timeNow()

curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf)
if err != nil {
return nil, err
}

curConf.ResetOffset = resetValue

curConf.Source = dd.consumerReset(context.Background(), *curConf, resetValue)
curConf.EnvVariables[keyStreams] = string(mustMarshalJSON(curConf.Source))

exr.Resource.Spec.Configs = modules.MustJSON(curConf)
exr.Resource.State = resource.State{
Status: resource.StatusPending,
Output: exr.Resource.State.Output,
NextSyncAt: &immediately,
ModuleData: modules.MustJSON(transientData{
ResetOffsetTo: resetValue,
PendingSteps: []string{
stepKafkaReset,
},
}),
}
return &exr.Resource, nil
}

func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error {
var flinkOut flink.Output
if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/dagger/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (
modData.PendingSteps = modData.PendingSteps[1:]

switch pendingStep {
case stepReleaseCreate, stepReleaseUpdate:
case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset:
isCreate := pendingStep == stepReleaseCreate
if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil {
return nil, err
Expand Down
31 changes: 30 additions & 1 deletion modules/dagger/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (

const (
keyFlinkDependency = "flink"
StopAction = "stop"
StartAction = "start"
ResetAction = "reset"
)

type FlinkCRDStatus struct {
Expand All @@ -40,6 +43,18 @@ var Module = module.Descriptor{
Name: module.UpdateAction,
Description: "Updates an existing dagger",
},
{
Name: StopAction,
Description: "Suspends a running dagger",
},
{
Name: StartAction,
Description: "Starts a suspended dagger",
},
{
Name: ResetAction,
Description: "Resets the offset of a dagger",
},
},
DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
conf := defaultDriverConf // clone the default value
Expand Down Expand Up @@ -93,7 +108,9 @@ var Module = module.Descriptor{
return kube.FlinkDeploymentStatus{}, err
}
return parseFlinkCRDStatus(crd.Object)
}}, nil
},
consumerReset: consumerReset,
}, nil
},
}

Expand Down Expand Up @@ -125,3 +142,15 @@ func parseFlinkCRDStatus(flinkDeployment map[string]interface{}) (kube.FlinkDepl
}
return status, nil
}

func consumerReset(ctx context.Context, conf Config, resetTo string) []Source {
baseGroup := conf.Source[0].SourceKafkaConsumerConfigGroupID
groupId := incrementGroupId(baseGroup, len(conf.Source))

for i := range conf.Source {
conf.Source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(groupId, i)
conf.Source[i].SourceKafkaConsumerConfigAutoOffsetReset = resetTo
}

return conf.Source
}

0 comments on commit 2eb0f56

Please sign in to comment.