Skip to content

Commit

Permalink
feat(firehose): enable resource to use same defaults as when created …
Browse files Browse the repository at this point in the history
…and provide upgrade action (#107)

* feat(firehose): extract default configs into module initialization

* feat(firehose): use defaults from firehose output

* feat(firehose): add upgrade action
  • Loading branch information
rohilsurana authored Jan 27, 2023
1 parent 43e888f commit dad50ff
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 68 deletions.
3 changes: 1 addition & 2 deletions cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cli
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"

Expand All @@ -22,7 +21,7 @@ const (
type RunEFunc func(cmd *cobra.Command, args []string) error

func parseFile(filePath string, v protoreflect.ProtoMessage) error {
b, err := ioutil.ReadFile(filePath)
b, err := os.ReadFile(filePath)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion docs/modules/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ The configuration struct for Firehose module looks like:
```
type moduleConfig struct {
State string `json:"state"`
ChartVersion string `json:"chart_version"`
Firehose struct {
Replicas int `json:"replicas"`
KafkaBrokerAddress string `json:"kafka_broker_address"`
Expand Down
49 changes: 21 additions & 28 deletions modules/firehose/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import (
"github.com/odpf/entropy/pkg/helm"
)

const (
defaultNamespace = "firehose"
defaultChartString = "firehose"
defaultVersionString = "0.1.1"
defaultRepositoryString = "https://odpf.github.io/charts/"
defaultImagePullPolicy = "IfNotPresent"
defaultImageRepository = "odpf/firehose"
defaultImageTag = "latest"
)

var (
//go:embed schema/config.json
completeConfigSchema string
Expand All @@ -33,11 +23,10 @@ var (
)

type moduleConfig struct {
State string `json:"state"`
ChartVersion string `json:"chart_version"`
StopTime *time.Time `json:"stop_time"`
Telegraf map[string]interface{} `json:"telegraf"`
Firehose struct {
State string `json:"state"`
StopTime *time.Time `json:"stop_time"`
Telegraf map[string]interface{} `json:"telegraf"`
Firehose struct {
Replicas int `json:"replicas"`
KafkaBrokerAddress string `json:"kafka_broker_address"`
KafkaTopic string `json:"kafka_topic"`
Expand All @@ -46,25 +35,29 @@ type moduleConfig struct {
} `json:"firehose"`
}

func (mc *moduleConfig) sanitiseAndValidate() error {
func (mc *moduleConfig) validate() error {
if mc.StopTime != nil && mc.StopTime.Before(time.Now()) {
return errors.ErrInvalid.
WithMsgf("value for stop_time must be greater than current time")
}
if mc.ChartVersion == "" {
mc.ChartVersion = defaultVersionString
}
return nil
}

func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseConfig {
func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) (*helm.ReleaseConfig, error) {
var output Output
err := json.Unmarshal(r.State.Output, &output)
if err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid output json: %v", err)
}
defaults := output.Defaults

rc := helm.DefaultReleaseConfig()
rc.Name = fmt.Sprintf("%s-%s-firehose", r.Project, r.Name)
rc.Repository = defaultRepositoryString
rc.Chart = defaultChartString
rc.Namespace = defaultNamespace
rc.Repository = defaults.ChartRepository
rc.Chart = defaults.ChartName
rc.Namespace = defaults.Namespace
rc.ForceUpdate = true
rc.Version = mc.ChartVersion
rc.Version = defaults.ChartVersion

fc := mc.Firehose
fc.EnvVariables["SOURCE_KAFKA_BROKERS"] = fc.KafkaBrokerAddress
Expand All @@ -75,9 +68,9 @@ func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseCo
"replicaCount": mc.Firehose.Replicas,
"firehose": map[string]interface{}{
"image": map[string]interface{}{
"repository": defaultImageRepository,
"pullPolicy": defaultImagePullPolicy,
"tag": defaultImageTag,
"repository": defaults.ImageRepository,
"pullPolicy": defaults.ImagePullPolicy,
"tag": defaults.ImageTag,
},
"config": fc.EnvVariables,
},
Expand All @@ -87,7 +80,7 @@ func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseCo
}
rc.Values = hv

return rc
return rc, nil
}

func (mc moduleConfig) JSON() []byte {
Expand Down
10 changes: 8 additions & 2 deletions modules/firehose/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ func (*firehoseModule) Log(ctx context.Context, res module.ExpandedResource, fil
if filter == nil {
filter = make(map[string]string)
}
filter["app"] = conf.GetHelmReleaseConfig(r).Name

hc, err := conf.GetHelmReleaseConfig(r)
if err != nil {
return nil, err
}

filter["app"] = hc.Name

kubeCl := kube.NewClient(kubeOut.Configs)
logs, err := kubeCl.StreamLogs(ctx, defaultNamespace, filter)
logs, err := kubeCl.StreamLogs(ctx, hc.Namespace, filter)
if err != nil {
return nil, err
}
Expand Down
50 changes: 44 additions & 6 deletions modules/firehose/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
)

const (
StopAction = "stop"
StartAction = "start"
ScaleAction = "scale"
ResetAction = "reset"
StopAction = "stop"
StartAction = "start"
ScaleAction = "scale"
ResetAction = "reset"
UpgradeAction = "upgrade"
)

const (
Expand Down Expand Up @@ -68,10 +69,47 @@ var Module = module.Descriptor{
Description: "Reset firehose kafka consumer group to given timestamp",
ParamSchema: resetActionSchema,
},
{
Name: UpgradeAction,
Description: "Upgrade firehose to current stable version",
},
},
DriverFactory: func(conf json.RawMessage) (module.Driver, error) {
return &firehoseModule{}, nil
fm := firehoseModuleWithDefaultConfigs()
err := json.Unmarshal(conf, fm)
if err != nil {
return nil, err
}
return fm, nil
},
}

type firehoseModule struct{}
type firehoseModule struct {
Config config `json:"config"`
}

type config struct {
ChartRepository string `json:"chart_repository,omitempty"`
ChartName string `json:"chart_name,omitempty"`
ChartVersion string `json:"chart_version,omitempty"`
ImageRepository string `json:"image_repository,omitempty"`
ImageName string `json:"image_name,omitempty"`
ImageTag string `json:"image_tag,omitempty"`
Namespace string `json:"namespace,omitempty"`
ImagePullPolicy string `json:"image_pull_policy,omitempty"`
}

func firehoseModuleWithDefaultConfigs() *firehoseModule {
return &firehoseModule{
config{
ChartRepository: "https://odpf.github.io/charts/",
ChartName: "firehose",
ChartVersion: "0.1.3",
ImageRepository: "odpf/firehose",
ImageName: "firehose",
ImageTag: "latest",
Namespace: "firehose",
ImagePullPolicy: "IfNotPresent",
},
}
}
29 changes: 23 additions & 6 deletions modules/firehose/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
)

type Output struct {
Namespace string `json:"namespace"`
ReleaseName string `json:"release_name"`
Pods []kube.Pod `json:"pods"`
Namespace string `json:"namespace,omitempty"`
ReleaseName string `json:"release_name,omitempty"`
Pods []kube.Pod `json:"pods,omitempty"`
Defaults config `json:"defaults,omitempty"`
}

func (out Output) JSON() []byte {
Expand All @@ -30,15 +31,26 @@ func (m *firehoseModule) Output(ctx context.Context, res module.ExpandedResource
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
}

var output Output
if err := json.Unmarshal(res.Resource.State.Output, &output); err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid output json: %v", err)
}

pods, err := m.podDetails(ctx, res)
if err != nil {
return nil, err
}

hc, err := conf.GetHelmReleaseConfig(res.Resource)
if err != nil {
return nil, err
}

return Output{
Namespace: conf.GetHelmReleaseConfig(res.Resource).Namespace,
ReleaseName: conf.GetHelmReleaseConfig(res.Resource).Name,
Namespace: hc.Namespace,
ReleaseName: hc.Name,
Pods: pods,
Defaults: output.Defaults,
}.JSON(), nil
}

Expand All @@ -55,6 +67,11 @@ func (*firehoseModule) podDetails(ctx context.Context, res module.ExpandedResour
return nil, err
}

hc, err := conf.GetHelmReleaseConfig(r)
if err != nil {
return nil, err
}

kubeCl := kube.NewClient(kubeOut.Configs)
return kubeCl.GetPodDetails(ctx, defaultNamespace, map[string]string{"app": conf.GetHelmReleaseConfig(r).Name})
return kubeCl.GetPodDetails(ctx, hc.Namespace, map[string]string{"app": hc.Name})
}
25 changes: 21 additions & 4 deletions modules/firehose/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,29 @@ func (m *firehoseModule) Plan(_ context.Context, res module.ExpandedResource, ac
}
}

func (*firehoseModule) planCreate(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) {
func (m *firehoseModule) planCreate(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) {
var plan module.Plan
r := res.Resource

var reqConf moduleConfig
if err := json.Unmarshal(act.Params, &reqConf); err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
}
if err := reqConf.sanitiseAndValidate(); err != nil {
if err := reqConf.validate(); err != nil {
return nil, err
}

output := Output{
Defaults: m.Config,
}.JSON()

r.Spec.Configs = reqConf.JSON()
r.State = resource.State{
Status: resource.StatusPending,
ModuleData: moduleData{
PendingSteps: []string{releaseCreate},
}.JSON(),
Output: output,
}

plan.Resource = r
Expand All @@ -48,7 +53,7 @@ func (*firehoseModule) planCreate(res module.ExpandedResource, act module.Action
return &plan, nil
}

func (*firehoseModule) planChange(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) {
func (m *firehoseModule) planChange(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) {
var plan module.Plan
r := res.Resource

Expand All @@ -63,7 +68,7 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action
if err := json.Unmarshal(act.Params, &reqConf); err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
}
if err := reqConf.sanitiseAndValidate(); err != nil {
if err := reqConf.validate(); err != nil {
return nil, err
}
conf = reqConf
Expand All @@ -90,6 +95,18 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action
case StopAction:
conf.State = stateStopped
plan.Reason = "firehose stopped"

case UpgradeAction:
var output Output
err := json.Unmarshal(res.State.Output, &output)
if err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid output json: %v", err)
}

output.Defaults = m.Config
res.State.Output = output.JSON()

plan.Reason = "firehose upgraded"
}

r.Spec.Configs = conf.JSON()
Expand Down
12 changes: 7 additions & 5 deletions modules/firehose/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
Name: "test",
Project: "demo",
Spec: resource.Spec{
Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
Configs: []byte(`{"state":"RUNNING","firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
},
State: resource.State{},
}
Expand Down Expand Up @@ -57,11 +57,12 @@ func TestFirehoseModule_Plan(t *testing.T) {
Name: "test",
Project: "demo",
Spec: resource.Spec{
Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
Configs: []byte(`{"state":"RUNNING","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
},
State: resource.State{
Status: resource.StatusPending,
ModuleData: []byte(`{"pending_steps":["release_create"]}`),
Output: []byte(`{"defaults":{}}`),
},
},
Reason: "firehose created",
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
Name: "test",
Project: "demo",
Spec: resource.Spec{
Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":5,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
Configs: []byte(`{"state":"RUNNING","stop_time":null,"telegraf":null,"firehose":{"replicas":5,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
},
State: resource.State{
Status: resource.StatusPending,
Expand All @@ -114,7 +115,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
Name: "test",
Project: "demo",
Spec: resource.Spec{
Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
Configs: []byte(`{"state":"RUNNING","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
},
State: resource.State{
Status: resource.StatusPending,
Expand All @@ -138,11 +139,12 @@ func TestFirehoseModule_Plan(t *testing.T) {
Name: "test",
Project: "demo",
Spec: resource.Spec{
Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":"3022-07-13T00:40:14.028016Z","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
Configs: []byte(`{"state":"RUNNING","stop_time":"3022-07-13T00:40:14.028016Z","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`),
},
State: resource.State{
Status: resource.StatusPending,
ModuleData: []byte(`{"pending_steps":["release_create"]}`),
Output: []byte(`{"defaults":{}}`),
},
},
ScheduleRunAt: parseTime("3022-07-13T00:40:14.028016Z"),
Expand Down
3 changes: 0 additions & 3 deletions modules/firehose/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
],
"default": "RUNNING"
},
"chart_version": {
"type": "string"
},
"stop_time": {
"type": "string",
"format": "date-time"
Expand Down
Loading

0 comments on commit dad50ff

Please sign in to comment.