diff --git a/cli/serve.go b/cli/serve.go
index ca260bf0..095f33bd 100644
--- a/cli/serve.go
+++ b/cli/serve.go
@@ -14,7 +14,9 @@ import (
entropyserver "github.com/goto/entropy/internal/server"
"github.com/goto/entropy/internal/store/postgres"
"github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/dagger"
"github.com/goto/entropy/modules/firehose"
+ "github.com/goto/entropy/modules/flink"
"github.com/goto/entropy/modules/job"
"github.com/goto/entropy/modules/kafka"
"github.com/goto/entropy/modules/kubernetes"
@@ -92,6 +94,8 @@ func setupRegistry() module.Registry {
firehose.Module,
job.Module,
kafka.Module,
+ flink.Module,
+ dagger.Module,
}
registry := &modules.Registry{}
diff --git a/modules/dagger/config.go b/modules/dagger/config.go
new file mode 100644
index 00000000..69eeddf9
--- /dev/null
+++ b/modules/dagger/config.go
@@ -0,0 +1,349 @@
+package dagger
+
+import (
+ _ "embed"
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/flink"
+ "github.com/goto/entropy/pkg/errors"
+ "github.com/goto/entropy/pkg/validator"
+)
+
+const (
+ helmReleaseNameMaxLength = 53
+)
+
+// Stream-related constants
+const (
+ keyStreams = "STREAMS"
+ keySinkType = "SINK_TYPE"
+)
+
+// Flink-related constants
+const (
+ keyFlinkJobID = "FLINK_JOB_ID"
+)
+
+// Influx-related constants
+const (
+ keySinkInfluxURL = "SINK_INFLUX_URL"
+ keySinkInfluxPassword = "SINK_INFLUX_PASSWORD"
+ keySinkInfluxDBName = "SINK_INFLUX_DB_NAME"
+ keySinkInfluxUsername = "SINK_INFLUX_USERNAME"
+ keySinkInfluxMeasurementName = "SINK_INFLUX_MEASUREMENT_NAME"
+)
+
+// Kafka-related constants
+const (
+ SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"
+ SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
+ SourceKafkaConsumerConfigBootstrapServers = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"
+ keySinkKafkaBrokers = "SINK_KAFKA_BROKERS"
+ keySinkKafkaStream = "SINK_KAFKA_STREAM"
+ keySinkKafkaProtoMsg = "SINK_KAFKA_PROTO_MESSAGE"
+ keySinkKafkaTopic = "SINK_KAFKA_TOPIC"
+ keySinkKafkaKey = "SINK_KAFKA_PROTO_KEY"
+ keySinkKafkaLingerMs = "SINK_KAFKA_LINGER_MS"
+)
+
+// Sink types
+const (
+ SinkTypeInflux = "INFLUX"
+ SinkTypeKafka = "KAFKA"
+ SinkTypeBigquery = "BIGQUERY"
+)
+
+// BigQuery-related constants
+const (
+ keySinkBigqueryGoogleCloudProjectID = "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"
+ keySinkBigqueryDatasetName = "SINK_BIGQUERY_DATASET_NAME"
+ keySinkBigqueryTableName = "SINK_BIGQUERY_TABLE_NAME"
+ keySinkBigqueryDatasetLabels = "SINK_BIGQUERY_DATASET_LABELS"
+ keySinkBigqueryTableLabels = "SINK_BIGQUERY_TABLE_LABELS"
+ keySinkBigqueryTablePartitioningEnable = "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"
+ keySinkBigqueryTableClusteringEnable = "SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"
+ keySinkBigqueryBatchSize = "SINK_BIGQUERY_BATCH_SIZE"
+ keySinkBigqueryTablePartitionKey = "SINK_BIGQUERY_TABLE_PARTITION_KEY"
+ keySinkBigqueryRowInsertIDEnable = "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"
+ keySinkBigqueryClientReadTimeoutMs = "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"
+ keySinkBigqueryClientConnectTimeoutMs = "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"
+ keySinkBigqueryTablePartitionExpiryMs = "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"
+ keySinkBigqueryDatasetLocation = "SINK_BIGQUERY_DATASET_LOCATION"
+ keySinkErrorTypesForFailure = "SINK_ERROR_TYPES_FOR_FAILURE"
+ keySinkBigqueryTableClusteringKeys = "SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"
+)
+
+var (
+ //go:embed schema/config.json
+ configSchemaRaw []byte
+
+ validateConfig = validator.FromJSONSchema(configSchemaRaw)
+)
+
+type UsageSpec struct {
+ CPU string `json:"cpu,omitempty" validate:"required"`
+ Memory string `json:"memory,omitempty" validate:"required"`
+}
+
+type Resources struct {
+ TaskManager UsageSpec `json:"taskmanager,omitempty"`
+ JobManager UsageSpec `json:"jobmanager,omitempty"`
+}
+
+type Config struct {
+ Resources Resources `json:"resources,omitempty"`
+ Source []Source `json:"source,omitempty"`
+ Sink Sink `json:"sink,omitempty"`
+ EnvVariables map[string]string `json:"env_variables,omitempty"`
+ Replicas int `json:"replicas"`
+ SinkType string `json:"sink_type"`
+ Team string `json:"team"`
+ FlinkName string `json:"flink_name,omitempty"`
+ DeploymentID string `json:"deployment_id,omitempty"`
+ Savepoint any `json:"savepoint,omitempty"`
+ ChartValues *ChartValues `json:"chart_values,omitempty"`
+ Deleted bool `json:"deleted,omitempty"`
+ Namespace string `json:"namespace,omitempty"`
+ PrometheusURL string `json:"prometheus_url,omitempty"`
+ JarURI string `json:"jar_uri,omitempty"`
+ State string `json:"state"`
+ JobState string `json:"job_state"`
+ ResetOffset string `json:"reset_offset"`
+}
+
+type ChartValues struct {
+ ImageRepository string `json:"image_repository" validate:"required"`
+ ImageTag string `json:"image_tag" validate:"required"`
+ ChartVersion string `json:"chart_version" validate:"required"`
+ ImagePullPolicy string `json:"image_pull_policy"`
+}
+
+type SourceDetail struct {
+ SourceName string `json:"SOURCE_NAME"`
+ SourceType string `json:"SOURCE_TYPE"`
+}
+
+type SourceKafka struct {
+ SourceKafkaConsumerConfigAutoCommitEnable string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"`
+ SourceKafkaConsumerConfigAutoOffsetReset string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"`
+ SourceKafkaTopicNames string `json:"SOURCE_KAFKA_TOPIC_NAMES"`
+ SourceKafkaName string `json:"SOURCE_KAFKA_NAME"`
+ SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"`
+ SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"`
+}
+
+type SourceParquet struct {
+ SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"`
+ SourceParquetFilePaths []string `json:"SOURCE_PARQUET_FILE_PATHS"`
+}
+
+type Source struct {
+ InputSchemaProtoClass string `json:"INPUT_SCHEMA_PROTO_CLASS"`
+ InputSchemaEventTimestampFieldIndex string `json:"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX"`
+ SourceDetails []SourceDetail `json:"SOURCE_DETAILS"`
+ InputSchemaTable string `json:"INPUT_SCHEMA_TABLE"`
+ SourceKafka
+ SourceParquet
+}
+
+type SinkKafka struct {
+ SinkKafkaBrokers string `json:"SINK_KAFKA_BROKERS"`
+ SinkKafkaStream string `json:"SINK_KAFKA_STREAM"`
+ SinkKafkaTopic string `json:"SINK_KAFKA_TOPIC"`
+ SinkKafkaProtoMsg string `json:"SINK_KAFKA_PROTO_MESSAGE"`
+ SinkKafkaLingerMs string `json:"SINK_KAFKA_LINGER_MS"`
+ SinkKafkaProtoKey string `json:"SINK_KAFKA_PROTO_KEY"`
+}
+
+type SinkInflux struct {
+ SinkInfluxDBName string `json:"SINK_INFLUX_DB_NAME"`
+ SinkInfluxMeasurementName string `json:"SINK_INFLUX_MEASUREMENT_NAME"`
+}
+
+type SinkBigquery struct {
+ SinkBigqueryGoogleCloudProjectID string `json:"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"`
+ SinkBigqueryTableName string `json:"SINK_BIGQUERY_TABLE_NAME"`
+ SinkBigqueryDatasetLabels string `json:"SINK_BIGQUERY_DATASET_LABELS"`
+ SinkBigqueryTableLabels string `json:"SINK_BIGQUERY_TABLE_LABELS"`
+ SinkBigqueryDatasetName string `json:"SINK_BIGQUERY_DATASET_NAME"`
+ SinkBigqueryTablePartitioningEnable string `json:"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"`
+ SinkBigqueryTablePartitionKey string `json:"SINK_BIGQUERY_TABLE_PARTITION_KEY"`
+ SinkBigqueryRowInsertIDEnable string `json:"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"`
+ SinkBigqueryClientReadTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"`
+ SinkBigqueryClientConnectTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"`
+ SinkBigqueryTablePartitionExpiryMs string `json:"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"`
+ SinkBigqueryDatasetLocation string `json:"SINK_BIGQUERY_DATASET_LOCATION"`
+ SinkBigqueryBatchSize string `json:"SINK_BIGQUERY_BATCH_SIZE"`
+ SinkBigqueryTableClusteringEnable string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"`
+ SinkBigqueryTableClusteringKeys string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"`
+ SinkErrorTypesForFailure string `json:"SINK_ERROR_TYPES_FOR_FAILURE"`
+}
+
+type Sink struct {
+ SinkKafka
+ SinkInflux
+ SinkBigquery
+}
+
+func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverConf) (*Config, error) {
+ var cfg Config
+ err := json.Unmarshal(confJSON, &cfg)
+ if err != nil {
+ return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error())
+ }
+
+ //transformation #9 and #11
+ //transformation #1
+ source := cfg.Source
+
+ if !(source[0].SourceParquet.SourceParquetFilePaths != nil && len(source[0].SourceParquet.SourceParquetFilePaths) > 0) {
+ for i := range source {
+ //TODO: check how to handle increment group id on update
+ if source[i].SourceKafkaConsumerConfigGroupID == "" {
+ source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i)
+ }
+ source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable]
+ source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset]
+ source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers]
+ }
+ }
+
+ cfg.Source = source
+
+ //transformation #2
+ cfg.EnvVariables = modules.CloneAndMergeMaps(dc.EnvVariables, cfg.EnvVariables)
+
+ //transformation #3
+ var flinkOut flink.Output
+ if err := json.Unmarshal(r.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error())
+ }
+
+ cfg.Namespace = flinkOut.KubeNamespace
+
+ //transformation #4
+ //transform resource name to safe length
+
+ //transformation #5
+ //TODO: build name from title as project-
-dagger
+ cfg.EnvVariables[keyFlinkJobID] = r.Name
+
+ //transformation #6
+ // note: enforce the kubernetes deployment name length limit.
+ if len(cfg.DeploymentID) == 0 {
+ cfg.DeploymentID = modules.SafeName(fmt.Sprintf("%s-%s", r.Project, r.Name), "-dagger", helmReleaseNameMaxLength)
+ } else if len(cfg.DeploymentID) > helmReleaseNameMaxLength {
+ return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars")
+ }
+
+ //transformation #7
+ cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL
+ cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password
+ cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username
+ //SINK_INFLUX_DB_NAME is added by client
+ //SINK_INFLUX_MEASUREMENT_NAME is added by client
+ //REDIS_SERVER is skipped
+
+ //transformation #8
+ //Longbow configs would be in base configs
+
+ //transformation #10
+ //this shall check if the project of the conf.EnvVars.STREAMS is same as that of the corresponding flink
+ //do we need to check this?
+
+ //transformation #13
+ cfg.EnvVariables[keySinkType] = cfg.SinkType
+ if cfg.SinkType == SinkTypeKafka {
+ cfg.EnvVariables[keySinkKafkaStream] = cfg.Sink.SinkKafka.SinkKafkaStream
+ cfg.EnvVariables[keySinkKafkaBrokers] = cfg.Sink.SinkKafka.SinkKafkaBrokers
+ cfg.EnvVariables[keySinkKafkaProtoMsg] = cfg.Sink.SinkKafka.SinkKafkaProtoMsg
+ cfg.EnvVariables[keySinkKafkaTopic] = cfg.Sink.SinkKafka.SinkKafkaTopic
+ cfg.EnvVariables[keySinkKafkaKey] = cfg.Sink.SinkKafka.SinkKafkaProtoKey
+ cfg.EnvVariables[keySinkKafkaLingerMs] = cfg.Sink.SinkKafka.SinkKafkaLingerMs
+ } else if cfg.SinkType == SinkTypeInflux {
+ cfg.EnvVariables[keySinkInfluxDBName] = cfg.Sink.SinkInflux.SinkInfluxDBName
+ cfg.EnvVariables[keySinkInfluxMeasurementName] = cfg.Sink.SinkInflux.SinkInfluxMeasurementName
+ } else if cfg.SinkType == SinkTypeBigquery {
+ cfg.EnvVariables[keySinkBigqueryGoogleCloudProjectID] = cfg.Sink.SinkBigquery.SinkBigqueryGoogleCloudProjectID
+ cfg.EnvVariables[keySinkBigqueryDatasetName] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetName
+ cfg.EnvVariables[keySinkBigqueryTableName] = cfg.Sink.SinkBigquery.SinkBigqueryTableName
+ cfg.EnvVariables[keySinkBigqueryDatasetLabels] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetLabels
+ cfg.EnvVariables[keySinkBigqueryTableLabels] = cfg.Sink.SinkBigquery.SinkBigqueryTableLabels
+ cfg.EnvVariables[keySinkBigqueryTablePartitioningEnable] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitioningEnable
+ cfg.EnvVariables[keySinkBigqueryTablePartitionKey] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitionKey
+ cfg.EnvVariables[keySinkBigqueryRowInsertIDEnable] = cfg.Sink.SinkBigquery.SinkBigqueryRowInsertIDEnable
+ cfg.EnvVariables[keySinkBigqueryClientReadTimeoutMs] = cfg.Sink.SinkBigquery.SinkBigqueryClientReadTimeoutMs
+ cfg.EnvVariables[keySinkBigqueryClientConnectTimeoutMs] = cfg.Sink.SinkBigquery.SinkBigqueryClientConnectTimeoutMs
+ cfg.EnvVariables[keySinkBigqueryTablePartitionExpiryMs] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitionExpiryMs
+ cfg.EnvVariables[keySinkBigqueryDatasetLocation] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetLocation
+ cfg.EnvVariables[keySinkBigqueryBatchSize] = cfg.Sink.SinkBigquery.SinkBigqueryBatchSize
+ cfg.EnvVariables[keySinkBigqueryTableClusteringEnable] = cfg.Sink.SinkBigquery.SinkBigqueryTableClusteringEnable
+ cfg.EnvVariables[keySinkBigqueryTableClusteringKeys] = cfg.Sink.SinkBigquery.SinkBigqueryTableClusteringKeys
+ cfg.EnvVariables[keySinkErrorTypesForFailure] = cfg.Sink.SinkBigquery.SinkErrorTypesForFailure
+ }
+
+ //transformation #14
+ cfg.Resources = mergeResources(dc.Resources, cfg.Resources)
+
+ cfg.PrometheusURL = flinkOut.PrometheusURL
+ cfg.FlinkName = flinkOut.FlinkName
+
+ if cfg.Replicas <= 0 {
+ cfg.Replicas = 1
+ }
+
+ if err := validateConfig(confJSON); err != nil {
+ return nil, err
+ }
+
+ return &cfg, nil
+}
+
+func incrementGroupId(groupId string, step int) string {
+ incrementNumberInString := func(number string) int {
+ num, _ := strconv.Atoi(number)
+ return num + step
+ }
+
+ leftZeroPad := func(number int) string {
+ return fmt.Sprintf("%04d", number)
+ }
+
+ getLastAndRestFromArray := func(arr []string) ([]string, string) {
+ return arr[:len(arr)-1], arr[len(arr)-1]
+ }
+
+ parts := strings.Split(groupId, "-")
+ name, number := getLastAndRestFromArray(parts)
+ updatedNumber := leftZeroPad(incrementNumberInString(number))
+ return strings.Join(append(name, updatedNumber), "-")
+}
+
+func mustMarshalJSON(v interface{}) []byte {
+ data, err := json.Marshal(v)
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal JSON: %v", err))
+ }
+ return data
+}
+
+func mergeResources(oldResources, newResources Resources) Resources {
+ if newResources.TaskManager.CPU == "" {
+ newResources.TaskManager.CPU = oldResources.TaskManager.CPU
+ }
+ if newResources.TaskManager.Memory == "" {
+ newResources.TaskManager.Memory = oldResources.TaskManager.Memory
+ }
+ if newResources.JobManager.CPU == "" {
+ newResources.JobManager.CPU = oldResources.JobManager.CPU
+ }
+ if newResources.JobManager.Memory == "" {
+ newResources.JobManager.Memory = oldResources.JobManager.Memory
+ }
+ return newResources
+}
diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go
new file mode 100644
index 00000000..2efb06d7
--- /dev/null
+++ b/modules/dagger/driver.go
@@ -0,0 +1,247 @@
+package dagger
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "html/template"
+ "strings"
+ "time"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/core/resource"
+ "github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/kubernetes"
+ "github.com/goto/entropy/pkg/errors"
+ "github.com/goto/entropy/pkg/helm"
+ "github.com/goto/entropy/pkg/kube"
+)
+
+const (
+ stepReleaseCreate = "release_create"
+ stepReleaseUpdate = "release_update"
+ stepReleaseStop = "release_stop"
+ stepKafkaReset = "kafka_reset"
+)
+
+const (
+ chartRepo = "https://goto.github.io/charts/"
+ chartName = "dagger-deployment-chart"
+ imageRepo = "gotocompany/dagger"
+)
+
+const (
+ labelsConfKey = "extra_labels"
+
+ labelDeployment = "deployment"
+ labelOrchestrator = "orchestrator"
+ labelURN = "urn"
+ labelName = "name"
+ labelNamespace = "namespace"
+
+ orchestratorLabelValue = "entropy"
+)
+
+const defaultKey = "default"
+
+var defaultDriverConf = driverConf{
+ Namespace: map[string]string{
+ defaultKey: "dagger",
+ },
+ ChartValues: ChartValues{
+ ChartVersion: "0.1.0",
+ },
+}
+
+type daggerDriver struct {
+ timeNow func() time.Time
+ conf driverConf
+ kubeDeploy kubeDeployFn
+ kubeGetPod kubeGetPodFn
+ kubeGetCRD kubeGetCRDFn
+ consumerReset consumerResetFn
+}
+
+type (
+ kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error
+ kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error)
+ kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error)
+ consumerResetFn func(ctx context.Context, conf Config, resetTo string) []Source
+)
+
+type driverConf struct {
+ // Labels to be injected to the chart during deployment. Values can be Go templates.
+ Labels map[string]string `json:"labels,omitempty"`
+
+ // Namespace is the kubernetes namespace where firehoses will be deployed.
+ Namespace map[string]string `json:"namespace" validate:"required"`
+
+ // ChartValues is the chart and image version information.
+ ChartValues ChartValues `json:"chart_values" validate:"required"`
+
+ EnvVariables map[string]string `json:"env_variables,omitempty"`
+
+ Resources Resources `json:"resources" validate:"required"`
+
+ JarURI string `json:"jar_uri" validate:"required"`
+
+ // timeout value for a kube deployment run
+ KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"`
+}
+
+type Output struct {
+ JMDeployStatus string `json:"jm_deploy_status,omitempty"`
+ JobStatus string `json:"job_status,omitempty"`
+ Reconcilation string `json:"reconcilation,omitempty"`
+ Pods []kube.Pod `json:"pods,omitempty"`
+ Namespace string `json:"namespace,omitempty"`
+ JobID string `json:"job_id,omitempty"`
+}
+
+type transientData struct {
+ PendingSteps []string `json:"pending_steps"`
+ ResetOffsetTo string `json:"reset_offset_to,omitempty"`
+}
+
+func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) {
+ if newVal == nil {
+ return cur, nil
+ }
+
+ merged := ChartValues{
+ ChartVersion: newVal.ChartVersion,
+ }
+
+ return &merged, nil
+}
+
+func readOutputData(exr module.ExpandedResource) (*Output, error) {
+ var curOut Output
+ if len(exr.Resource.State.Output) == 0 {
+ return &curOut, nil
+ }
+ if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error())
+ }
+ return &curOut, nil
+}
+
+func readTransientData(exr module.ExpandedResource) (*transientData, error) {
+ if len(exr.Resource.State.ModuleData) == 0 {
+ return &transientData{}, nil
+ }
+
+ var modData transientData
+ if err := json.Unmarshal(exr.Resource.State.ModuleData, &modData); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("corrupted transient data").WithCausef(err.Error())
+ }
+ return &modData, nil
+}
+
+func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config,
+ kubeOut kubernetes.Output,
+) (*helm.ReleaseConfig, error) {
+
+ entropyLabels := map[string]string{
+ labelDeployment: conf.DeploymentID,
+ labelOrchestrator: orchestratorLabelValue,
+ }
+
+ otherLabels := map[string]string{
+ labelURN: res.URN,
+ labelName: res.Name,
+ labelNamespace: conf.Namespace,
+ }
+
+ deploymentLabels, err := renderTpl(dd.conf.Labels, modules.CloneAndMergeMaps(res.Labels, modules.CloneAndMergeMaps(entropyLabels, otherLabels)))
+ if err != nil {
+ return nil, err
+ }
+
+ rc := helm.DefaultReleaseConfig()
+ rc.Timeout = dd.conf.KubeDeployTimeout
+ rc.Name = conf.DeploymentID
+ rc.Repository = chartRepo
+ rc.Chart = chartName
+ rc.Namespace = conf.Namespace
+ rc.ForceUpdate = true
+ rc.Version = conf.ChartValues.ChartVersion
+
+ imageRepository := dd.conf.ChartValues.ImageRepository
+ if conf.ChartValues.ImageRepository != "" {
+ imageRepository = conf.ChartValues.ImageRepository
+ }
+
+ var programArgs []string
+ for key, value := range conf.EnvVariables {
+ // Check if the value is a JSON object and escape quotes if necessary
+ if json.Valid([]byte(value)) {
+ value = strings.ReplaceAll(value, `"`, `\"`)
+ }
+ programArgs = append(programArgs, fmt.Sprintf("\"%s\"", "--"+key), fmt.Sprintf("\"%v\"", value))
+ }
+
+ //fmt.Printf("programArgs: %v\n", programArgs)
+ formatted := fmt.Sprintf("[%s]", strings.Join(programArgs, ","))
+ //fmt.Printf("formatted: %v\n", formatted)
+ encodedProgramArgs := base64.StdEncoding.EncodeToString([]byte(formatted))
+
+ rc.Values = map[string]any{
+ labelsConfKey: modules.CloneAndMergeMaps(deploymentLabels, entropyLabels),
+ "image": imageRepository,
+ "deployment_id": conf.DeploymentID,
+ "configuration": map[string]any{
+ "FLINK_PARALLELISM": conf.Replicas,
+ },
+ "projectID": res.Project,
+ "name": res.Name,
+ "team": conf.Team,
+ "flink_name": conf.FlinkName,
+ "prometheus_url": conf.PrometheusURL,
+ "resources": map[string]any{
+ "jobmanager": map[string]any{
+ "cpu": conf.Resources.JobManager.CPU,
+ "memory": conf.Resources.JobManager.Memory,
+ },
+ "taskmanager": map[string]any{
+ "cpu": conf.Resources.TaskManager.CPU,
+ "memory": conf.Resources.JobManager.Memory,
+ },
+ },
+ "jarURI": conf.JarURI,
+ "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs),
+ "state": conf.JobState,
+ "namespace": conf.Namespace,
+ }
+
+ return rc, nil
+}
+
+// TODO: move this to pkg
+func renderTpl(labelsTpl map[string]string, labelsValues map[string]string) (map[string]string, error) {
+ const useZeroValueForMissingKey = "missingkey=zero"
+
+ finalLabels := map[string]string{}
+ for k, v := range labelsTpl {
+ var buf bytes.Buffer
+ t, err := template.New("").Option(useZeroValueForMissingKey).Parse(v)
+ if err != nil {
+ return nil, errors.ErrInvalid.
+ WithMsgf("label template for '%s' is invalid", k).WithCausef(err.Error())
+ } else if err := t.Execute(&buf, labelsValues); err != nil {
+ return nil, errors.ErrInvalid.
+ WithMsgf("failed to render label template").WithCausef(err.Error())
+ }
+
+ // allow empty values
+ // labelVal := strings.TrimSpace(buf.String())
+ // if labelVal == "" {
+ // continue
+ // }
+
+ finalLabels[k] = buf.String()
+ }
+ return finalLabels, nil
+}
diff --git a/modules/dagger/driver_output.go b/modules/dagger/driver_output.go
new file mode 100644
index 00000000..6b3f3873
--- /dev/null
+++ b/modules/dagger/driver_output.go
@@ -0,0 +1,60 @@
+package dagger
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/core/resource"
+ "github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/flink"
+ "github.com/goto/entropy/modules/kubernetes"
+ "github.com/goto/entropy/pkg/errors"
+)
+
+func (dd *daggerDriver) Output(ctx context.Context, exr module.ExpandedResource) (json.RawMessage, error) {
+ output, err := readOutputData(exr)
+ if err != nil {
+ return nil, err
+ }
+
+ conf, err := readConfig(exr, exr.Spec.Configs, dd.conf)
+ if err != nil {
+ return nil, errors.ErrInternal.WithCausef(err.Error())
+ }
+
+ var flinkOut flink.Output
+ if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error())
+ }
+
+ return dd.refreshOutput(ctx, exr.Resource, *conf, *output, flinkOut.KubeCluster)
+}
+
+func (dd *daggerDriver) refreshOutput(ctx context.Context, r resource.Resource,
+ conf Config, output Output, kubeOut kubernetes.Output,
+) (json.RawMessage, error) {
+ rc, err := dd.getHelmRelease(r, conf, kubeOut)
+ if err != nil {
+ return nil, err
+ }
+
+ pods, err := dd.kubeGetPod(ctx, kubeOut.Configs, rc.Namespace, map[string]string{"app": conf.DeploymentID})
+ if err != nil {
+ return nil, errors.ErrInternal.WithCausef(err.Error())
+ }
+ output.Pods = pods
+ output.Namespace = conf.Namespace
+ output.JobID = conf.DeploymentID
+
+ crd, err := dd.kubeGetCRD(ctx, kubeOut.Configs, rc.Namespace, rc.Name)
+ if err != nil {
+ return nil, errors.ErrInternal.WithCausef(err.Error())
+ }
+
+ output.JMDeployStatus = crd.JMDeployStatus
+ output.JobStatus = crd.JobStatus
+ output.Reconcilation = crd.Reconciliation
+
+ return modules.MustJSON(output), nil
+}
diff --git a/modules/dagger/driver_plan.go b/modules/dagger/driver_plan.go
new file mode 100644
index 00000000..9fd5a966
--- /dev/null
+++ b/modules/dagger/driver_plan.go
@@ -0,0 +1,199 @@
+package dagger
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/core/resource"
+ "github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/flink"
+ "github.com/goto/entropy/pkg/errors"
+ "github.com/goto/entropy/pkg/kafka"
+)
+
+const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
+const (
+ JobStateRunning = "running"
+ JobStateSuspended = "suspended"
+ StateDeployed = "DEPLOYED"
+ StateUserStopped = "USER_STOPPED"
+)
+
+func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
+ switch act.Name {
+ case module.CreateAction:
+ return dd.planCreate(exr, act)
+
+ case ResetAction:
+ return dd.planReset(exr, act)
+
+ default:
+ return dd.planChange(exr, act)
+ }
+}
+
+func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
+ conf, err := readConfig(exr, act.Params, dd.conf)
+ if err != nil {
+ return nil, err
+ }
+
+ //transformation #12
+ conf.EnvVariables[keyStreams] = string(mustMarshalJSON(conf.Source))
+
+ chartVals, err := mergeChartValues(&dd.conf.ChartValues, conf.ChartValues)
+ if err != nil {
+ return nil, err
+ }
+ conf.ChartValues = chartVals
+
+ immediately := dd.timeNow()
+ conf.JarURI = dd.conf.JarURI
+ conf.State = StateDeployed
+ conf.JobState = JobStateRunning
+
+ exr.Resource.Spec.Configs = modules.MustJSON(conf)
+
+ err = dd.validateHelmReleaseConfigs(exr, *conf)
+ if err != nil {
+ return nil, err
+ }
+
+ exr.Resource.State = resource.State{
+ Status: resource.StatusPending,
+ Output: modules.MustJSON(Output{
+ Namespace: conf.Namespace,
+ }),
+ NextSyncAt: &immediately,
+ ModuleData: modules.MustJSON(transientData{
+ PendingSteps: []string{stepReleaseCreate},
+ }),
+ }
+
+ return &exr.Resource, nil
+}
+
+func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
+ curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf)
+ if err != nil {
+ return nil, err
+ }
+
+ switch act.Name {
+ case module.UpdateAction:
+ newConf, err := readConfig(exr, act.Params, dd.conf)
+ if err != nil {
+ return nil, err
+ }
+
+ newConf.Source = mergeConsumerGroupId(curConf.Source, newConf.Source)
+
+ chartVals, err := mergeChartValues(curConf.ChartValues, newConf.ChartValues)
+ if err != nil {
+ return nil, err
+ }
+
+ // restore configs that are not user-controlled.
+ newConf.DeploymentID = curConf.DeploymentID
+ newConf.ChartValues = chartVals
+ newConf.JarURI = curConf.JarURI
+ newConf.State = StateDeployed
+ newConf.JobState = JobStateRunning
+
+ newConf.Resources = mergeResources(curConf.Resources, newConf.Resources)
+
+ curConf = newConf
+
+ case StopAction:
+ curConf.State = StateUserStopped
+ curConf.JobState = JobStateSuspended
+
+ case StartAction:
+ curConf.State = StateDeployed
+ curConf.JobState = JobStateRunning
+
+ }
+ immediately := dd.timeNow()
+
+ exr.Resource.Spec.Configs = modules.MustJSON(curConf)
+
+ err = dd.validateHelmReleaseConfigs(exr, *curConf)
+ if err != nil {
+ return nil, err
+ }
+
+ exr.Resource.State = resource.State{
+ Status: resource.StatusPending,
+ Output: exr.Resource.State.Output,
+ ModuleData: modules.MustJSON(transientData{
+ PendingSteps: []string{stepReleaseUpdate},
+ }),
+ NextSyncAt: &immediately,
+ }
+
+ return &exr.Resource, nil
+}
+
+func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
+ resetValue, err := kafka.ParseResetV2Params(act.Params)
+ if err != nil {
+ return nil, err
+ }
+
+ immediately := dd.timeNow()
+
+ curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf)
+ if err != nil {
+ return nil, err
+ }
+
+ curConf.ResetOffset = resetValue
+
+ curConf.Source = dd.consumerReset(context.Background(), *curConf, resetValue)
+ curConf.EnvVariables[keyStreams] = string(mustMarshalJSON(curConf.Source))
+
+ exr.Resource.Spec.Configs = modules.MustJSON(curConf)
+ exr.Resource.State = resource.State{
+ Status: resource.StatusPending,
+ Output: exr.Resource.State.Output,
+ NextSyncAt: &immediately,
+ ModuleData: modules.MustJSON(transientData{
+ ResetOffsetTo: resetValue,
+ PendingSteps: []string{
+ stepKafkaReset,
+ },
+ }),
+ }
+ return &exr.Resource, nil
+}
+
+func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error {
+ var flinkOut flink.Output
+ if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
+ return errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error())
+ }
+
+ _, err := dd.getHelmRelease(expandedResource.Resource, config, flinkOut.KubeCluster)
+ return err
+}
+
+func mergeConsumerGroupId(currStreams, newStreams []Source) []Source {
+ if len(currStreams) != len(newStreams) {
+ return newStreams
+ }
+
+ for i := range currStreams {
+ if currStreams[i].SourceParquet.SourceParquetFilePaths != nil && len(currStreams[i].SourceParquet.SourceParquetFilePaths) > 0 {
+ //source is parquete
+ //do nothing
+ continue
+ }
+
+ if currStreams[i].SourceKafka.SourceKafkaName == newStreams[i].SourceKafka.SourceKafkaName {
+ newStreams[i].SourceKafka.SourceKafkaConsumerConfigGroupID = currStreams[i].SourceKafka.SourceKafkaConsumerConfigGroupID
+ }
+ }
+
+ return newStreams
+}
diff --git a/modules/dagger/driver_sync.go b/modules/dagger/driver_sync.go
new file mode 100644
index 00000000..f5ebdcdd
--- /dev/null
+++ b/modules/dagger/driver_sync.go
@@ -0,0 +1,88 @@
+package dagger
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/core/resource"
+ "github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/flink"
+ "github.com/goto/entropy/modules/kubernetes"
+ "github.com/goto/entropy/pkg/errors"
+)
+
+func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (*resource.State, error) {
+ modData, err := readTransientData(exr)
+ if err != nil {
+ return nil, err
+ }
+
+ out, err := readOutputData(exr)
+ if err != nil {
+ return nil, errors.ErrInternal.WithCausef(err.Error())
+ }
+
+ conf, err := readConfig(exr, exr.Spec.Configs, dd.conf)
+ if err != nil {
+ return nil, errors.ErrInternal.WithCausef(err.Error())
+ }
+
+ var flinkOut flink.Output
+ if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error())
+ }
+
+ finalState := resource.State{
+ Status: resource.StatusPending,
+ Output: exr.Resource.State.Output,
+ }
+
+ if len(modData.PendingSteps) > 0 {
+ pendingStep := modData.PendingSteps[0]
+ modData.PendingSteps = modData.PendingSteps[1:]
+
+ switch pendingStep {
+ case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset:
+ isCreate := pendingStep == stepReleaseCreate
+ if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil {
+ return nil, err
+ }
+ default:
+ return nil, errors.ErrInternal.WithMsgf("unknown step: '%s'", pendingStep)
+ }
+
+ // we have more pending states, so enqueue resource for another sync
+ // as soon as possible.
+ immediately := dd.timeNow()
+ finalState.NextSyncAt = &immediately
+ finalState.ModuleData = modules.MustJSON(modData)
+
+ return &finalState, nil
+ }
+
+ finalOut, err := dd.refreshOutput(ctx, exr.Resource, *conf, *out, flinkOut.KubeCluster)
+ if err != nil {
+ return nil, err
+ }
+ finalState.Output = finalOut
+
+ finalState.Status = resource.StatusCompleted
+ finalState.ModuleData = nil
+ return &finalState, nil
+
+}
+
+func (dd *daggerDriver) releaseSync(ctx context.Context, r resource.Resource,
+ isCreate bool, conf Config, kubeOut kubernetes.Output,
+) error {
+ rc, err := dd.getHelmRelease(r, conf, kubeOut)
+ if err != nil {
+ return err
+ }
+
+ if err := dd.kubeDeploy(ctx, isCreate, kubeOut.Configs, *rc); err != nil {
+ return errors.ErrInternal.WithCausef(err.Error())
+ }
+ return nil
+}
diff --git a/modules/dagger/module.go b/modules/dagger/module.go
new file mode 100644
index 00000000..47da1f86
--- /dev/null
+++ b/modules/dagger/module.go
@@ -0,0 +1,156 @@
+package dagger
+
+import (
+ "context"
+ _ "embed"
+ "encoding/json"
+ "time"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/modules/flink"
+ "github.com/goto/entropy/pkg/errors"
+ "github.com/goto/entropy/pkg/helm"
+ "github.com/goto/entropy/pkg/kube"
+ "github.com/goto/entropy/pkg/validator"
+ "helm.sh/helm/v3/pkg/release"
+ v1 "k8s.io/api/core/v1"
+)
+
+const (
+ keyFlinkDependency = "flink"
+ StopAction = "stop"
+ StartAction = "start"
+ ResetAction = "reset"
+)
+
+type FlinkCRDStatus struct {
+ JobManagerDeploymentStatus string `json:"jobManagerDeploymentStatus"`
+ JobStatus string `json:"jobStatus"`
+ ReconciliationStatus string `json:"reconciliationStatus"`
+}
+
+var Module = module.Descriptor{
+ Kind: "dagger",
+ Dependencies: map[string]string{
+ keyFlinkDependency: flink.Module.Kind,
+ },
+ Actions: []module.ActionDesc{
+ {
+ Name: module.CreateAction,
+ Description: "Creates a new dagger",
+ },
+ {
+ Name: module.UpdateAction,
+ Description: "Updates an existing dagger",
+ },
+ {
+ Name: StopAction,
+ Description: "Suspends a running dagger",
+ },
+ {
+ Name: StartAction,
+ Description: "Starts a suspended dagger",
+ },
+ {
+ Name: ResetAction,
+ Description: "Resets the offset of a dagger",
+ },
+ },
+ DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
+ conf := defaultDriverConf // clone the default value
+ if err := json.Unmarshal(confJSON, &conf); err != nil {
+ return nil, err
+ } else if err := validator.TaggedStruct(conf); err != nil {
+ return nil, err
+ }
+
+ return &daggerDriver{
+ conf: conf,
+ timeNow: time.Now,
+ kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error {
+ canUpdate := func(rel *release.Release) bool {
+ curLabels, ok := rel.Config[labelsConfKey].(map[string]any)
+ if !ok {
+ return false
+ }
+ newLabels, ok := hc.Values[labelsConfKey].(map[string]string)
+ if !ok {
+ return false
+ }
+
+ isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue
+ isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment]
+
+ return isManagedByEntropy && isSameDeployment
+ }
+
+ helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf})
+ _, errHelm := helmCl.Upsert(&hc, canUpdate)
+ return errHelm
+ },
+ kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
+ kubeCl, err := kube.NewClient(ctx, conf)
+ if err != nil {
+ return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error())
+ }
+ return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool {
+ // allow pods that are in running state and are not marked for deletion
+ return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil
+ })
+ },
+ kubeGetCRD: func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) {
+ kubeCl, err := kube.NewClient(ctx, conf)
+ if err != nil {
+ return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error())
+ }
+ crd, err := kubeCl.GetCRDDetails(ctx, ns, name)
+ if err != nil {
+ return kube.FlinkDeploymentStatus{}, err
+ }
+ return parseFlinkCRDStatus(crd.Object)
+ },
+ consumerReset: consumerReset,
+ }, nil
+ },
+}
+
+func parseFlinkCRDStatus(flinkDeployment map[string]interface{}) (kube.FlinkDeploymentStatus, error) {
+ var flinkCRDStatus FlinkCRDStatus
+ statusInterface, ok := flinkDeployment["status"].(map[string]interface{})
+ if !ok {
+ return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to convert flink deployment status to map[string]interface{}")
+ }
+
+ if jmStatus, ok := statusInterface["jobManagerDeploymentStatus"].(string); ok {
+ flinkCRDStatus.JobManagerDeploymentStatus = jmStatus
+ }
+ if jobStatus, ok := statusInterface["jobStatus"].(map[string]interface{}); ok {
+ if st, ok := jobStatus["state"].(string); ok {
+ flinkCRDStatus.JobStatus = st
+ }
+ }
+ if reconciliationStatus, ok := statusInterface["reconciliationStatus"].(map[string]interface{}); ok {
+ if st, ok := reconciliationStatus["state"].(string); ok {
+ flinkCRDStatus.ReconciliationStatus = st
+ }
+ }
+
+ status := kube.FlinkDeploymentStatus{
+ JMDeployStatus: flinkCRDStatus.JobManagerDeploymentStatus,
+ JobStatus: flinkCRDStatus.JobStatus,
+ Reconciliation: flinkCRDStatus.ReconciliationStatus,
+ }
+ return status, nil
+}
+
+func consumerReset(ctx context.Context, conf Config, resetTo string) []Source {
+ baseGroup := conf.Source[0].SourceKafkaConsumerConfigGroupID
+ groupId := incrementGroupId(baseGroup, len(conf.Source))
+
+ for i := range conf.Source {
+ conf.Source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(groupId, i)
+ conf.Source[i].SourceKafkaConsumerConfigAutoOffsetReset = resetTo
+ }
+
+ return conf.Source
+}
diff --git a/modules/dagger/schema/config.json b/modules/dagger/schema/config.json
new file mode 100644
index 00000000..3effabd0
--- /dev/null
+++ b/modules/dagger/schema/config.json
@@ -0,0 +1,49 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "$id": "http://json-schema.org/draft-07/schema#",
+ "type": "object",
+ "required": [
+ "replicas",
+ "env_variables",
+ "team",
+ "source",
+ "sink",
+ "sink_type"
+ ],
+ "properties": {
+ "replicas": {
+ "type": "number",
+ "default": 1,
+ "minimum": 1
+ },
+ "deployment_id": {
+ "type": "string"
+ },
+ "sink_type": {
+ "type": "string",
+ "enum": [
+ "INFLUX",
+ "KAFKA",
+ "BIGQUERY"
+ ]
+ },
+ "env_variables": {
+ "type": "object",
+ "additionalProperties": true,
+ "required": [
+ "SINK_TYPE"
+ ],
+ "properties": {
+ "SINK_TYPE": {
+ "type": "string",
+ "enum": [
+ "INFLUX",
+ "KAFKA",
+ "BIGQUERY"
+ ]
+ }
+ }
+ }
+ }
+ }
+
\ No newline at end of file
diff --git a/modules/flink/config.go b/modules/flink/config.go
new file mode 100644
index 00000000..c13a65ad
--- /dev/null
+++ b/modules/flink/config.go
@@ -0,0 +1,63 @@
+package flink
+
+import (
+ _ "embed"
+ "encoding/json"
+
+ "github.com/goto/entropy/core/resource"
+ "github.com/goto/entropy/pkg/errors"
+ "github.com/goto/entropy/pkg/validator"
+)
+
+var (
+ //go:embed schema/config.json
+ configSchemaRaw []byte
+
+ validateConfig = validator.FromJSONSchema(configSchemaRaw)
+)
+
+type Influx struct {
+ URL string `json:"url,omitempty"`
+ Username string `json:"username,omitempty"`
+ Password string `json:"password,omitempty"`
+}
+
+type Config struct {
+ KubeNamespace string `json:"kube_namespace,omitempty"`
+ Influx Influx `json:"influx,omitempty"`
+ SinkKafkaStream string `json:"sink_kafka_stream,omitempty"`
+ PrometheusURL string `json:"prometheus_url,omitempty"`
+ FlinkName string `json:"flink_name,omitempty"`
+ ExtraStreams []string `json:"extra_streams,omitempty"`
+}
+
+func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) {
+ var cfg Config
+ if err := json.Unmarshal(confJSON, &cfg); err != nil {
+ return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error())
+ }
+
+ if cfg.Influx.URL == "" {
+ cfg.Influx.URL = dc.Influx.URL
+ cfg.Influx.Username = dc.Influx.Username
+ cfg.Influx.Password = dc.Influx.Password
+ }
+
+ if cfg.SinkKafkaStream == "" {
+ cfg.SinkKafkaStream = dc.SinkKafkaStream
+ }
+
+ if cfg.KubeNamespace == "" {
+ cfg.KubeNamespace = dc.KubeNamespace
+ }
+
+ if cfg.PrometheusURL == "" {
+ cfg.PrometheusURL = dc.PrometheusURL
+ }
+
+ if cfg.FlinkName == "" {
+ cfg.FlinkName = dc.FlinkName
+ }
+
+ return &cfg, nil
+}
diff --git a/modules/flink/driver.go b/modules/flink/driver.go
new file mode 100644
index 00000000..9f20a2c6
--- /dev/null
+++ b/modules/flink/driver.go
@@ -0,0 +1,42 @@
+package flink
+
+import (
+ "encoding/json"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/modules/kubernetes"
+ "github.com/goto/entropy/pkg/errors"
+)
+
+type flinkDriver struct {
+ conf driverConf
+}
+
+type driverConf struct {
+ Influx Influx `json:"influx,omitempty"`
+ SinkKafkaStream string `json:"sink_kafka_stream,omitempty"`
+ KubeNamespace string `json:"kube_namespace,omitempty"`
+ PrometheusURL string `json:"prometheus_url,omitempty"`
+ FlinkName string `json:"flink_name,omitempty"`
+}
+
+type Output struct {
+ KubeCluster kubernetes.Output `json:"kube_cluster,omitempty"`
+ KubeNamespace string `json:"kube_namespace,omitempty"`
+ Influx Influx `json:"influx,omitempty"`
+ SinkKafkaStream string `json:"sink_kafka_stream,omitempty"`
+ PrometheusURL string `json:"prometheus_url,omitempty"`
+ FlinkName string `json:"flink_name,omitempty"`
+ ExtraStreams []string `json:"extra_streams,omitempty"`
+}
+
+func readOutputData(exr module.ExpandedResource) (*Output, error) {
+ var curOut Output
+ if len(exr.Resource.State.Output) == 0 {
+ return &curOut, nil
+ }
+ if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error())
+ }
+ return &curOut, nil
+}
diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go
new file mode 100644
index 00000000..4c413e22
--- /dev/null
+++ b/modules/flink/driver_output.go
@@ -0,0 +1,41 @@
+package flink
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/modules"
+ "github.com/goto/entropy/modules/kubernetes"
+ "github.com/goto/entropy/pkg/errors"
+)
+
+func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) (json.RawMessage, error) {
+ output, err := readOutputData(exr)
+ if err != nil {
+ return nil, err
+ }
+
+ conf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs, fd.conf)
+ if err != nil {
+ if errors.Is(err, errors.ErrInvalid) {
+ return nil, err
+ }
+ return nil, errors.ErrInternal.WithCausef(err.Error())
+ }
+
+ var kubeOut kubernetes.Output
+ if err := json.Unmarshal(exr.Dependencies[keyKubeDependency].Output, &kubeOut); err != nil {
+ return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error())
+ }
+
+ output.KubeCluster = kubeOut
+ output.Influx = conf.Influx
+ output.KubeNamespace = conf.KubeNamespace
+ output.SinkKafkaStream = conf.SinkKafkaStream
+ output.PrometheusURL = conf.PrometheusURL
+ output.FlinkName = conf.FlinkName
+ output.ExtraStreams = conf.ExtraStreams
+
+ return modules.MustJSON(output), nil
+}
diff --git a/modules/flink/driver_plan.go b/modules/flink/driver_plan.go
new file mode 100644
index 00000000..6bfe4471
--- /dev/null
+++ b/modules/flink/driver_plan.go
@@ -0,0 +1,27 @@
+package flink
+
+import (
+ "context"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/core/resource"
+)
+
+func (fd *flinkDriver) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
+ res.Resource.Spec = resource.Spec{
+ Configs: act.Params,
+ Dependencies: res.Spec.Dependencies,
+ }
+
+ output, err := fd.Output(ctx, res)
+ if err != nil {
+ return nil, err
+ }
+
+ res.Resource.State = resource.State{
+ Status: resource.StatusCompleted,
+ Output: output,
+ }
+
+ return &res.Resource, nil
+}
diff --git a/modules/flink/driver_sync.go b/modules/flink/driver_sync.go
new file mode 100644
index 00000000..f32c891c
--- /dev/null
+++ b/modules/flink/driver_sync.go
@@ -0,0 +1,16 @@
+package flink
+
+import (
+ "context"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/core/resource"
+)
+
+func (*flinkDriver) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) {
+ return &resource.State{
+ Status: resource.StatusCompleted,
+ Output: res.Resource.State.Output,
+ ModuleData: nil,
+ }, nil
+}
diff --git a/modules/flink/module.go b/modules/flink/module.go
new file mode 100644
index 00000000..4a319d22
--- /dev/null
+++ b/modules/flink/module.go
@@ -0,0 +1,33 @@
+package flink
+
+import (
+ _ "embed"
+ "encoding/json"
+
+ "github.com/goto/entropy/core/module"
+ "github.com/goto/entropy/pkg/errors"
+)
+
+const (
+ keyKubeDependency = "kube_cluster"
+)
+
+var Module = module.Descriptor{
+ Kind: "flink",
+ Actions: []module.ActionDesc{
+ {
+ Name: module.CreateAction,
+ },
+ {
+ Name: module.UpdateAction,
+ },
+ },
+ DriverFactory: func(conf json.RawMessage) (module.Driver, error) {
+ fd := &flinkDriver{}
+ err := json.Unmarshal(conf, &fd)
+ if err != nil {
+ return nil, errors.ErrInvalid.WithMsgf("failed to unmarshal module config: %v", err)
+ }
+ return fd, nil
+ },
+}
diff --git a/modules/flink/schema/config.json b/modules/flink/schema/config.json
new file mode 100644
index 00000000..a78df504
--- /dev/null
+++ b/modules/flink/schema/config.json
@@ -0,0 +1,27 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "$id": "http://json-schema.org/draft-07/schema#",
+ "type": "object",
+ "properties": {
+ "kube_namespace": {
+ "type": "string"
+ },
+ "influx": {
+ "type": "object",
+ "properties": {
+ "url": {
+ "type": "string"
+ },
+ "username": {
+ "type": "string"
+ },
+ "password": {
+ "type": "string"
+ }
+ }
+ },
+ "sink_kafka_stream": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
diff --git a/pkg/helm/helm.go b/pkg/helm/helm.go
index c032a1fa..c06b59fe 100644
--- a/pkg/helm/helm.go
+++ b/pkg/helm/helm.go
@@ -70,20 +70,22 @@ func (p *Client) doCreate(actionConfig *action.Configuration, config *ReleaseCon
}
act := action.NewInstall(actionConfig)
- act.Wait = config.Wait
- act.DryRun = false
+ act.Wait = true
+ act.IncludeCRDs = true
+ act.SkipCRDs = false
act.Timeout = time.Duration(config.Timeout) * time.Second
act.Replace = config.Replace
act.OutputDir = ""
act.Namespace = config.Namespace
act.ClientOnly = false
act.Description = config.Description
- act.WaitForJobs = config.WaitForJobs
+ act.WaitForJobs = true
act.ReleaseName = config.Name
act.GenerateName = false
act.NameTemplate = ""
act.CreateNamespace = config.CreateNamespace
act.ChartPathOptions = *chartPathOpts
+ act.DryRun = false
rel, err := act.Run(fetchedChart, config.Values)
if err != nil {
diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index af9f6fa4..b4c15b2e 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -15,9 +15,12 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
+ "k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
typedbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/rest"
@@ -55,6 +58,13 @@ type Pod struct {
Status string `json:"status"`
}
+type FlinkDeploymentStatus struct {
+ State string `json:"state"`
+ JMDeployStatus string `json:"jm_deploy_status"`
+ JobStatus string `json:"job_status"`
+ Reconciliation string `json:"reconciliation"`
+}
+
type LogOptions struct {
App string `mapstructure:"app"`
Pod string `mapstructure:"pod"`
@@ -323,6 +333,29 @@ func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelect
return podDetails, nil
}
+func (c Client) GetCRDDetails(ctx context.Context, namespace string, name string) (*unstructured.Unstructured, error) {
+ // Initialize the dynamic client
+ dynamicClient, err := dynamic.NewForConfig(&c.restConfig)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create dynamic client: %v", err)
+ }
+
+ // Define the GVR (GroupVersionResource) for the FlinkDeployment CRD
+ gvr := schema.GroupVersionResource{
+ Group: "flink.apache.org",
+ Version: "v1beta1",
+ Resource: "flinkdeployments",
+ }
+
+ // Fetch the FlinkDeployment CRD details
+ flinkDeployment, err := dynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get FlinkDeployment: %v", err)
+ }
+
+ return flinkDeployment, nil
+}
+
func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- LogChunk, clientSet *kubernetes.Clientset,
podLogOpts corev1.PodLogOptions,
) error {
diff --git a/test/e2e_test/firehose_test.go b/test/e2e_test/firehose_test.go
index f63033bf..d411a613 100644
--- a/test/e2e_test/firehose_test.go
+++ b/test/e2e_test/firehose_test.go
@@ -36,7 +36,7 @@ func (s *FirehoseTestSuite) SetupTest() {
modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{})
s.Require().NoError(err)
- s.Require().Equal(6, len(modules.GetModules()))
+ s.Require().Equal(9, len(modules.GetModules()))
resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{
Kind: "kubernetes",
diff --git a/test/e2e_test/flink_test.go b/test/e2e_test/flink_test.go
new file mode 100644
index 00000000..462859ea
--- /dev/null
+++ b/test/e2e_test/flink_test.go
@@ -0,0 +1,138 @@
+package e2e_test
+
+import (
+ "context"
+ "encoding/json"
+ "os"
+ "testing"
+
+ "github.com/goto/entropy/cli"
+ entropyv1beta1 "github.com/goto/entropy/proto/gotocompany/entropy/v1beta1"
+ "github.com/goto/entropy/test/testbench"
+ "github.com/ory/dockertest/v3"
+ "github.com/stretchr/testify/suite"
+ "sigs.k8s.io/kind/pkg/cluster"
+)
+
+type FlinkTestSuite struct {
+ suite.Suite
+ ctx context.Context
+ moduleClient entropyv1beta1.ModuleServiceClient
+ resourceClient entropyv1beta1.ResourceServiceClient
+ cancelModuleClient func()
+ cancelResourceClient func()
+ cancel func()
+ resource *dockertest.Resource
+ pool *dockertest.Pool
+ appConfig *cli.Config
+ kubeProvider *cluster.Provider
+}
+
+func (s *FlinkTestSuite) SetupTest() {
+ s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true, true)
+
+ modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{})
+ s.Require().NoError(err)
+ s.Require().Equal(9, len(modules.GetModules()))
+
+ resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{
+ Kind: "kubernetes",
+ })
+ s.Require().NoError(err)
+ s.Require().Equal(3, len(resources.GetResources()))
+}
+
+func (s *FlinkTestSuite) TestFlink() {
+ s.Run("create flink module return success", func() {
+ moduleData, err := os.ReadFile(testbench.TestDataPath + "module/flink_module.json")
+ if err != nil {
+ s.T().Fatal(err)
+ }
+
+ var moduleConfig *entropyv1beta1.Module
+ err = json.Unmarshal(moduleData, &moduleConfig)
+ if err != nil {
+ s.T().Fatal(err)
+ }
+ _, err = s.moduleClient.CreateModule(s.ctx, &entropyv1beta1.CreateModuleRequest{
+ Module: moduleConfig,
+ })
+ s.Require().NoError(err)
+ })
+ /*
+ s.Run("create flink with invalid config will return invalid error", func() {
+ _, err := s.resourceClient.CreateResource(s.ctx, &entropyv1beta1.CreateResourceRequest{
+ Resource: &entropyv1beta1.Resource{
+ Name: "test-flink",
+ Project: "test-project",
+ Kind: "flink",
+ Spec: &entropyv1beta1.ResourceSpec{
+ Configs: structpb.NewStringValue("{}"),
+ Dependencies: []*entropyv1beta1.ResourceDependency{},
+ },
+ },
+ })
+ s.Assert().Equal(codes.InvalidArgument, status.Convert(err).Code())
+ })
+ */
+ s.Run("create flink with right config will return success", func() {
+ resourceData, err := os.ReadFile(testbench.TestDataPath + "/resource/flink_resource.json")
+ if err != nil {
+ s.T().Fatal(err)
+ }
+
+ var resourceConfig *entropyv1beta1.Resource
+ err = json.Unmarshal(resourceData, &resourceConfig)
+ if err != nil {
+ s.T().Fatal(err)
+ }
+
+ _, err = s.resourceClient.CreateResource(s.ctx, &entropyv1beta1.CreateResourceRequest{
+ Resource: resourceConfig,
+ })
+ s.Require().NoError(err)
+ })
+
+ resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{
+ Kind: "flink",
+ })
+ s.Require().NoError(err)
+ s.Require().Equal(1, len(resources.GetResources()))
+
+ s.Run("update flink with right config will return success", func() {
+ resourceData, err := os.ReadFile(testbench.TestDataPath + "/resource/flink_resource.json")
+ if err != nil {
+ s.T().Fatal(err)
+ }
+
+ var resourceConfig *entropyv1beta1.Resource
+ err = json.Unmarshal(resourceData, &resourceConfig)
+ if err != nil {
+ s.T().Fatal(err)
+ }
+
+ resourceConfig.Spec.Dependencies = nil
+
+ _, err = s.resourceClient.UpdateResource(s.ctx, &entropyv1beta1.UpdateResourceRequest{
+ Urn: resources.GetResources()[0].Urn,
+ NewSpec: resourceConfig.Spec,
+ })
+ s.Require().NoError(err)
+ })
+}
+
+func (s *FlinkTestSuite) TearDownTest() {
+ if err := s.pool.Purge(s.resource); err != nil {
+ s.T().Fatal(err)
+ }
+
+ if err := s.kubeProvider.Delete(testbench.TestClusterName, ""); err != nil {
+ s.T().Fatal(err)
+ }
+
+ s.cancel()
+}
+
+func TestFlinkTestSuite(t *testing.T) {
+ suite.Run(t, new(FlinkTestSuite))
+}
diff --git a/test/e2e_test/worker_test.go b/test/e2e_test/worker_test.go
index b22951ba..96b37bc3 100644
--- a/test/e2e_test/worker_test.go
+++ b/test/e2e_test/worker_test.go
@@ -34,7 +34,7 @@ func (s *WorkerTestSuite) SetupTest() {
modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{})
s.Require().NoError(err)
- s.Require().Equal(6, len(modules.GetModules()))
+ s.Require().Equal(9, len(modules.GetModules()))
resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{
Kind: "kubernetes",
diff --git a/test/testbench/bootstrap.go b/test/testbench/bootstrap.go
index d67e44ed..054fdc7b 100644
--- a/test/testbench/bootstrap.go
+++ b/test/testbench/bootstrap.go
@@ -60,6 +60,31 @@ func BootstrapFirehoseModule(ctx context.Context, client entropyv1beta1.ModuleSe
return nil
}
+func BootstrapFlinkModule(ctx context.Context, client entropyv1beta1.ModuleServiceClient, testDataPath string) error {
+ moduleData, err := os.ReadFile(testDataPath + "/module/flink_module.json")
+ if err != nil {
+ return err
+ }
+
+ var moduleConfig *entropyv1beta1.Module
+ if err = json.Unmarshal(moduleData, &moduleConfig); err != nil {
+ return err
+ }
+
+ project := moduleConfig.Project
+ for i := 0; i < 3; i++ {
+ moduleConfig.Project = fmt.Sprintf("%s-%d", project, i)
+
+ if _, err := client.CreateModule(ctx, &entropyv1beta1.CreateModuleRequest{
+ Module: moduleConfig,
+ }); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
func BootstrapKubernetesResource(ctx context.Context, client entropyv1beta1.ResourceServiceClient, kubeProvider *cluster.Provider, testDataPath string) error {
resourceData, err := os.ReadFile(testDataPath + "/resource/kubernetes_resource.json")
if err != nil {
diff --git a/test/testbench/test_data/module/flink_module.json b/test/testbench/test_data/module/flink_module.json
new file mode 100644
index 00000000..7e887305
--- /dev/null
+++ b/test/testbench/test_data/module/flink_module.json
@@ -0,0 +1,5 @@
+{
+ "name": "flink",
+ "project": "test-project",
+ "configs": {}
+}
\ No newline at end of file
diff --git a/test/testbench/test_data/resource/flink_resource.json b/test/testbench/test_data/resource/flink_resource.json
new file mode 100644
index 00000000..d68f0a56
--- /dev/null
+++ b/test/testbench/test_data/resource/flink_resource.json
@@ -0,0 +1,25 @@
+{
+ "kind": "flink",
+ "name": "test-flink",
+ "project": "test-project-0",
+ "labels": {
+ "description": "test flink resource"
+ },
+ "spec": {
+ "configs": {
+ "influx": {
+ "password": "influx-password",
+ "url": "localhost:1234",
+ "username": "influx-user"
+ },
+ "kube_namespace": "flink-ns",
+ "sink_kafka_stream": "flinkstream"
+ },
+ "dependencies": [
+ {
+ "key": "kube_cluster",
+ "value": "orn:entropy:kubernetes:test-project-0:test-kube"
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/test/testbench/testbench.go b/test/testbench/testbench.go
index 7792860e..e71ea50a 100644
--- a/test/testbench/testbench.go
+++ b/test/testbench/testbench.go
@@ -121,6 +121,11 @@ func SetupTests(t *testing.T, spawnWorkers bool, setupKube bool) (context.Contex
t.Fatal()
}
+ err = BootstrapFlinkModule(ctx, moduleClient, TestDataPath)
+ if err != nil {
+ t.Fatal()
+ }
+
if setupKube {
err = BootstrapKubernetesResource(ctx, resourceClient, provider, TestDataPath)
if err != nil {