Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pod override fields affinity and tolerations #203

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- [#203](https://github.com/deviceinsight/kafkactl/pull/203) Add pod override fields affinity and tolerations

## 5.2.0 - 2024-08-08

Expand Down
21 changes: 20 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,25 @@ contexts:
# optional: nodeSelector to add to the pod
nodeSelector:
key: value

# optional: affinity to add to the pod
affinity:
# note: other types of affinity also supported
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "<key>"
operator: "<operator>"
values: [ "<value>" ]

# optional: tolerations to add to the pod
tolerations:
- key: "<key>"
operator: "<operator>"
value: "<value>"
effect: "<effect>"

# optional: clientID config (defaults to kafkactl-{username})
clientID: my-client-id

Expand Down Expand Up @@ -624,7 +643,7 @@ Producing protobuf message converted from JSON:
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
----

A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.
A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.

For example, if you have the following protobuf definition (`complex.proto`):

Expand Down
14 changes: 14 additions & 0 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type TLSConfig struct {
Insecure bool
}

type K8sToleration struct {
Key string `json:"key" yaml:"key"`
Operator string `json:"operator" yaml:"operator"`
Value string `json:"value" yaml:"value"`
Effect string `json:"effect" yaml:"effect"`
}

type K8sConfig struct {
Enabled bool
Binary string
Expand All @@ -74,6 +81,8 @@ type K8sConfig struct {
Labels map[string]string
Annotations map[string]string
NodeSelector map[string]string
Affinity map[string]any
Tolerations []K8sToleration
}

type ConsumerConfig struct {
Expand Down Expand Up @@ -174,6 +183,11 @@ func CreateClientContext() (ClientContext, error) {
context.Kubernetes.Labels = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.labels")
context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations")
context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector")
context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity")

if err := viper.UnmarshalKey("contexts."+context.Name+".kubernetes.tolerations", &context.Kubernetes.Tolerations); err != nil {
return context, err
}

return context, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ConsumerGroupOffsetOperation struct {

func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags ResetConsumerGroupOffsetFlags, groupName string) error {

if (flags.Topic == nil || len(flags.Topic) == 0) && (!flags.AllTopics) {
if (len(flags.Topic) == 0) && (!flags.AllTopics) {
return errors.New("no topic specified")
}

Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type executor struct {
labels map[string]string
annotations map[string]string
nodeSelector map[string]string
affinity map[string]any
tolerations []internal.K8sToleration
}

const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789"
Expand Down Expand Up @@ -111,6 +113,8 @@ func newExecutor(context internal.ClientContext, runner Runner) *executor {
labels: context.Kubernetes.Labels,
annotations: context.Kubernetes.Annotations,
nodeSelector: context.Kubernetes.NodeSelector,
affinity: context.Kubernetes.Affinity,
tolerations: context.Kubernetes.Tolerations,
runner: runner,
}
}
Expand Down
20 changes: 16 additions & 4 deletions internal/k8s/pod_overrides.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package k8s

import "github.com/deviceinsight/kafkactl/v5/internal"

type imagePullSecretType struct {
Name string `json:"name"`
}
Expand All @@ -10,9 +12,11 @@ type metadataType struct {
}

type specType struct {
ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
NodeSelector *map[string]string `json:"nodeSelector,omitempty"`
ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
NodeSelector *map[string]string `json:"nodeSelector,omitempty"`
Affinity *map[string]any `json:"affinity,omitempty"`
Tolerations *[]internal.K8sToleration `json:"tolerations,omitempty"`
}

type PodOverrideType struct {
Expand All @@ -29,7 +33,7 @@ func (kubectl *executor) createPodOverride() PodOverrideType {
var override PodOverrideType
override.APIVersion = "v1"

if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 {
if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 || len(kubectl.affinity) > 0 || len(kubectl.tolerations) > 0 {
override.Spec = &specType{}

if kubectl.serviceAccount != "" {
Expand All @@ -44,6 +48,14 @@ func (kubectl *executor) createPodOverride() PodOverrideType {
if len(kubectl.nodeSelector) > 0 {
override.Spec.NodeSelector = &kubectl.nodeSelector
}

if len(kubectl.affinity) > 0 {
override.Spec.Affinity = &kubectl.affinity
}

if len(kubectl.tolerations) > 0 {
override.Spec.Tolerations = &kubectl.tolerations
}
}

if len(kubectl.labels) > 0 || len(kubectl.annotations) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error {
}

if inputMessage, err = inputParser.ParseLine(line); err != nil {
return failWithMessageCount(messageCount, err.Error())
return failWithMessageCount(messageCount, err.Error()) //nolint:govet
}

messageCount++
Expand Down
Loading