Skip to content

Commit

Permalink
feat: add sink based kube namespace selection (#86)
Browse files Browse the repository at this point in the history
* feat: add sink based ns selection

* feat: add namespace to labels

* fix: should check exisitng stopped flag

* fix: error handling

* fix: make private error
  • Loading branch information
ishanarya0 authored Nov 2, 2023
1 parent 4d0e906 commit 2a0d67d
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 28 deletions.
8 changes: 8 additions & 0 deletions modules/firehose/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,13 @@ func readConfig(r resource.Resource, confJSON json.RawMessage, dc driverConf) (*
cfg.Limits = rl.Limits.merge(cfg.Limits)
cfg.Requests = rl.Requests.merge(cfg.Requests)

if cfg.Namespace == "" {
ns := dc.Namespace[defaultKey]
if override, ok := dc.Namespace[cfg.EnvVariables[confSinkType]]; ok {
ns = override
}
cfg.Namespace = ns
}

return &cfg, nil
}
14 changes: 9 additions & 5 deletions modules/firehose/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ const (
labelOrchestrator = "orchestrator"
labelURN = "urn"
labelName = "name"
labelNamespace = "namespace"

orchestratorLabelValue = "entropy"
)

const defaultKey = "default"

var defaultDriverConf = driverConf{
Namespace: "firehose",
Namespace: map[string]string{
defaultKey: "firehose",
},
ChartValues: ChartValues{
ImageTag: "latest",
ChartVersion: "0.1.3",
Expand Down Expand Up @@ -87,7 +90,7 @@ type driverConf struct {
Telegraf *Telegraf `json:"telegraf"`

// Namespace is the kubernetes namespace where firehoses will be deployed.
Namespace string `json:"namespace" validate:"required"`
Namespace map[string]string `json:"namespace" validate:"required"`

// ChartValues is the chart and image version information.
ChartValues ChartValues `json:"chart_values" validate:"required"`
Expand Down Expand Up @@ -183,11 +186,12 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config,
}

otherLabels := map[string]string{
labelURN: res.URN,
labelName: res.Name,
labelURN: res.URN,
labelName: res.Name,
labelNamespace: conf.Namespace,
}

deploymentLabels, err := renderTpl(fd.conf.Labels, modules.CloneAndMergeMaps(res.Labels, entropyLabels))
deploymentLabels, err := renderTpl(fd.conf.Labels, modules.CloneAndMergeMaps(res.Labels, modules.CloneAndMergeMaps(entropyLabels, otherLabels)))
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions modules/firehose/driver_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (fd *firehoseDriver) refreshOutput(ctx context.Context, r resource.Resource
return nil, errors.ErrInternal.WithCausef(err.Error())
}
output.Pods = pods
output.Namespace = conf.Namespace

return modules.MustJSON(output), nil
}
44 changes: 42 additions & 2 deletions modules/firehose/driver_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestFirehoseDriver_Output(t *testing.T) {
exr: sampleResourceWithState(resource.State{
Status: resource.StatusCompleted,
Output: modules.MustJSON(Output{}),
}),
}, "LOG", "firehose"),
kubeGetPod: func(t *testing.T) kubeGetPodFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
Expand All @@ -95,7 +95,7 @@ func TestFirehoseDriver_Output(t *testing.T) {
Namespace: "firehose",
ReleaseName: "foo-bar",
}),
}),
}, "LOG", "firehose"),
kubeGetPod: func(t *testing.T) kubeGetPodFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
Expand All @@ -120,6 +120,40 @@ func TestFirehoseDriver_Output(t *testing.T) {
ReleaseName: "foo-bar",
}),
},
{
title: "Update_Namespace",
exr: sampleResourceWithState(resource.State{
Status: resource.StatusCompleted,
Output: modules.MustJSON(Output{
Pods: nil,
Namespace: "firehose",
ReleaseName: "foo-bar",
}),
}, "BIGQUERY", "bigquery-firehose"),
kubeGetPod: func(t *testing.T) kubeGetPodFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
assert.Equal(t, ns, "bigquery-firehose")
assert.Equal(t, labels["app"], "firehose-foo-fh1")
return []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
}, nil
}
},
want: modules.MustJSON(Output{
Pods: []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
},
Namespace: "bigquery-firehose",
ReleaseName: "foo-bar",
}),
},
}

for _, tt := range table {
Expand All @@ -128,6 +162,12 @@ func TestFirehoseDriver_Output(t *testing.T) {
conf: defaultDriverConf,
timeNow: func() time.Time { return frozenTime },
}

fd.conf.Namespace = map[string]string{
defaultKey: "firehose",
"BIGQUERY": "bigquery-firehose",
}

if tt.kubeGetPod != nil {
fd.kubeGetPod = tt.kubeGetPod(t)
}
Expand Down
15 changes: 13 additions & 2 deletions modules/firehose/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var (
suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)
)

var errCauseInvalidNamespaceUpdate = "cannot update kube namespace of a running firehose"

func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
switch act.Name {
case module.CreateAction:
Expand Down Expand Up @@ -57,10 +59,20 @@ func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.Act
// restore configs that are not user-controlled.
newConf.DeploymentID = curConf.DeploymentID
newConf.ChartValues = chartVals
newConf.Namespace = curConf.Namespace
newConf.Telegraf = fd.conf.Telegraf
newConf.InitContainer = fd.conf.InitContainer

ns := fd.conf.Namespace[defaultKey]
if override, ok := fd.conf.Namespace[newConf.EnvVariables[confSinkType]]; ok {
ns = override
}
if curConf.Namespace != ns {
if !curConf.Stopped {
return nil, errors.ErrInvalid.WithCausef(errCauseInvalidNamespaceUpdate)
}
newConf.Namespace = ns
}

curConf = newConf

case ScaleAction:
Expand Down Expand Up @@ -120,7 +132,6 @@ func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.Act

// set project defaults.
conf.Telegraf = fd.conf.Telegraf
conf.Namespace = fd.conf.Namespace
conf.ChartValues = chartVals

immediately := fd.timeNow()
Expand Down
Loading

0 comments on commit 2a0d67d

Please sign in to comment.