Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add include defaults when describing topics #184

Merged
merged 3 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics

## 3.5.1 - 2023-11-10

## 3.5.0 - 2023-11-10
Expand Down
2 changes: 1 addition & 1 deletion cmd/describe/describe-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newDescribeTopicCmd() *cobra.Command {
}

cmdDescribeTopic.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml|wide")
cmdDescribeTopic.Flags().BoolVarP(&flags.PrintConfigs, "print-configs", "c", true, "print configs")
cmdDescribeTopic.Flags().StringVarP((*string)(&flags.PrintConfigs), "print-configs", "c", "no_defaults", "print configs. One of none|no_defaults|all")
cmdDescribeTopic.Flags().BoolVarP(&flags.SkipEmptyPartitions, "skip-empty", "s", false, "show only partitions that have a messages")

return cmdDescribeTopic
Expand Down
68 changes: 68 additions & 0 deletions cmd/describe/describe-topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,69 @@ import (
"strings"
"testing"

"github.com/deviceinsight/kafkactl/internal"
"github.com/deviceinsight/kafkactl/internal/topic"

"github.com/deviceinsight/kafkactl/testutil"
)

func TestDescribeTopicConfigsIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

prefix := "describe-t-configs-"

topicName1 := testutil.CreateTopic(t, prefix, "--config", "retention.ms=3600000")

kafkaCtl := testutil.CreateKafkaCtlCommand()
kafkaCtl.Verbose = false

// default --print-configs=no_defaults
if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-o", "yaml"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

describedTopic, err := topic.FromYaml(kafkaCtl.GetStdOut())
if err != nil {
t.Fatalf("failed to read yaml: %v", err)
}

configKeys := getConfigKeys(describedTopic.Configs)

testutil.AssertArraysEquals(t, []string{"retention.ms"}, configKeys)
testutil.AssertEquals(t, "3600000", describedTopic.Configs[0].Value)

// explicitly without defaults
if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-c", "no_defaults", "-o", "yaml"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

describedTopic, err = topic.FromYaml(kafkaCtl.GetStdOut())
if err != nil {
t.Fatalf("failed to read yaml: %v", err)
}

configKeys = getConfigKeys(describedTopic.Configs)

testutil.AssertArraysEquals(t, []string{"retention.ms"}, configKeys)
testutil.AssertEquals(t, "3600000", describedTopic.Configs[0].Value)

// all configs
if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-c", "all", "-o", "yaml"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

describedTopic, err = topic.FromYaml(kafkaCtl.GetStdOut())
if err != nil {
t.Fatalf("failed to read yaml: %v", err)
}

configKeys = getConfigKeys(describedTopic.Configs)

testutil.AssertContains(t, "retention.ms", configKeys)
testutil.AssertContains(t, "cleanup.policy", configKeys)
}

func TestDescribeTopicAutoCompletionIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand All @@ -30,3 +90,11 @@ func TestDescribeTopicAutoCompletionIntegration(t *testing.T) {
testutil.AssertContains(t, topicName2, outputLines)
testutil.AssertContains(t, topicName3, outputLines)
}

func getConfigKeys(configs []internal.Config) []string {
keys := make([]string, len(configs))
for i, config := range configs {
keys[i] = config.Name
}
return keys
}
4 changes: 2 additions & 2 deletions internal/broker/broker-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (operation *Operation) GetBrokers(flags GetBrokersFlags) error {
Name: fmt.Sprint(broker.ID()),
}

if configs, err = internal.ListConfigs(&admin, brokerConfig); err != nil {
if configs, err = internal.ListConfigs(&admin, brokerConfig, false); err != nil {
return err
}

Expand Down Expand Up @@ -153,7 +153,7 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags)
Name: fmt.Sprint(broker.ID()),
}

if configs, err = internal.ListConfigs(&admin, brokerConfig); err != nil {
if configs, err = internal.ListConfigs(&admin, brokerConfig, false); err != nil {
return err
}

Expand Down
13 changes: 9 additions & 4 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,9 @@ func TopicExists(client *sarama.Client, name string) (bool, error) {
return false, nil
}

func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource) ([]Config, error) {
func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource, includeDefaults bool) ([]Config, error) {

var (
configs = make([]Config, 0)
configEntries []sarama.ConfigEntry
err error
)
Expand All @@ -321,15 +320,21 @@ func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource) ([]
return nil, errors.Wrap(err, fmt.Sprintf("failed to describe %v config", getResourceTypeName(resource.Type)))
}

return listConfigsFromEntries(configEntries, includeDefaults), nil
d-rk marked this conversation as resolved.
Show resolved Hide resolved
}

func listConfigsFromEntries(configEntries []sarama.ConfigEntry, includeDefaults bool) []Config {
var configs = make([]Config, 0)

for _, configEntry := range configEntries {

if !configEntry.Default && configEntry.Source != sarama.SourceDefault {
if includeDefaults || (!configEntry.Default && configEntry.Source != sarama.SourceDefault) {
entry := Config{Name: configEntry.Name, Value: configEntry.Value}
configs = append(configs, entry)
}
}

return configs, nil
return configs
}

func getResourceTypeName(resourceType sarama.ConfigResourceType) string {
Expand Down
83 changes: 83 additions & 0 deletions internal/common-operation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package internal

import (
"reflect"
"testing"

"github.com/IBM/sarama"
)

func TestListConfigsFromEntries(t *testing.T) {
testCases := []struct {
name string
entries []sarama.ConfigEntry
includeDefaults bool
configs []Config
}{
{
name: "not include defaults, empty entries",
entries: []sarama.ConfigEntry{},
configs: []Config{},
},
{
name: "not include defaults",
entries: []sarama.ConfigEntry{
{
Name: "non_default",
Value: "ND",
Default: false,
Source: sarama.SourceUnknown,
},
{
Name: "default",
Value: "D",
Default: true,
Source: sarama.SourceDefault,
},
},
configs: []Config{
{Name: "non_default", Value: "ND"},
},
},
{
name: "include defaults, empty entries",
entries: []sarama.ConfigEntry{},
configs: []Config{},
includeDefaults: true,
},
{
name: "include defaults",
entries: []sarama.ConfigEntry{
{
Name: "non_default",
Value: "ND",
Default: false,
Source: sarama.SourceUnknown,
},
{
Name: "default",
Value: "D",
Default: true,
Source: sarama.SourceDefault,
},
},
configs: []Config{
{Name: "non_default", Value: "ND"},
{Name: "default", Value: "D"},
},
includeDefaults: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
configs := listConfigsFromEntries(tc.entries, tc.includeDefaults)

if len(configs) > 0 &&
len(tc.configs) > 0 &&
!reflect.DeepEqual(configs, tc.configs) {
t.Fatalf("expect: %v, got %v", tc.configs, configs)
}
})
}
}
38 changes: 28 additions & 10 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ type requestedTopicFields struct {
partitionLeader bool
partitionReplicas bool
partitionISRs bool
config bool
config PrintConfigsParam
}

var allFields = requestedTopicFields{partitionID: true, partitionOffset: true, partitionLeader: true, partitionReplicas: true, partitionISRs: true, config: true}
var allFields = requestedTopicFields{
partitionID: true, partitionOffset: true, partitionLeader: true,
partitionReplicas: true, partitionISRs: true, config: NonDefaultConfigs,
}

type GetTopicsFlags struct {
OutputFormat string
Expand All @@ -60,8 +63,16 @@ type AlterTopicFlags struct {
Configs []string
}

type PrintConfigsParam string

const (
NoConfigs PrintConfigsParam = "none"
AllConfigs PrintConfigsParam = "all"
NonDefaultConfigs PrintConfigsParam = "no_defaults"
)

type DescribeTopicFlags struct {
PrintConfigs bool
PrintConfigs PrintConfigsParam
SkipEmptyPartitions bool
OutputFormat string
}
Expand Down Expand Up @@ -161,7 +172,10 @@ func (operation *Operation) DescribeTopic(topic string, flags DescribeTopicFlags
return errors.Wrap(err, "failed to create cluster admin")
}

if t, err = readTopic(&client, &admin, topic, allFields); err != nil {
fields := allFields
fields.config = flags.PrintConfigs

if t, err = readTopic(&client, &admin, topic, fields); err != nil {
return errors.Wrap(err, "failed to read topic")
}

Expand All @@ -170,7 +184,7 @@ func (operation *Operation) DescribeTopic(topic string, flags DescribeTopicFlags

func (operation *Operation) printTopic(topic Topic, flags DescribeTopicFlags) error {

if !flags.PrintConfigs {
if flags.PrintConfigs == NoConfigs {
topic.Configs = nil
}

Expand Down Expand Up @@ -401,7 +415,11 @@ func (operation *Operation) AlterTopic(topic string, flags AlterTopicFlags) erro
}

if flags.ValidateOnly {
describeFlags := DescribeTopicFlags{PrintConfigs: len(flags.Configs) > 0}
printConfigs := NoConfigs
if len(flags.Configs) > 0 {
printConfigs = NonDefaultConfigs
}
describeFlags := DescribeTopicFlags{PrintConfigs: printConfigs}
return operation.printTopic(t, describeFlags)
}
return nil
Expand Down Expand Up @@ -472,7 +490,7 @@ func (operation *Operation) CloneTopic(sourceTopic, targetTopic string) error {
requestedFields := requestedTopicFields{
partitionID: true,
partitionReplicas: true,
config: true,
config: NonDefaultConfigs,
}

if t, err = readTopic(&client, &admin, sourceTopic, requestedFields); err != nil {
Expand Down Expand Up @@ -581,7 +599,7 @@ func (operation *Operation) GetTopics(flags GetTopicsFlags) error {
} else if flags.OutputFormat == "compact" {
tableWriter.Initialize()
} else if flags.OutputFormat == "wide" {
requestedFields = requestedTopicFields{partitionID: true, partitionReplicas: true, config: true}
requestedFields = requestedTopicFields{partitionID: true, partitionReplicas: true, config: NonDefaultConfigs}
if err := tableWriter.WriteHeader("TOPIC", "PARTITIONS", "REPLICATION FACTOR", "CONFIGS"); err != nil {
return err
}
Expand Down Expand Up @@ -725,14 +743,14 @@ func readTopic(client *sarama.Client, admin *sarama.ClusterAdmin, name string, r
return top.Partitions[i].ID < top.Partitions[j].ID
})

if requestedFields.config {
if requestedFields.config != NoConfigs {

topicConfig := sarama.ConfigResource{
Type: sarama.TopicResource,
Name: name,
}

if top.Configs, err = internal.ListConfigs(admin, topicConfig); err != nil {
if top.Configs, err = internal.ListConfigs(admin, topicConfig, requestedFields.config == AllConfigs); err != nil {
return top, err
}
}
Expand Down
Loading