Skip to content

Commit

Permalink
Allow creating topic using topic description from file
Browse files Browse the repository at this point in the history
  • Loading branch information
gotha committed Aug 10, 2024
1 parent c1cf2ca commit 72dd004
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/create/create-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newCreateTopicCmd() *cobra.Command {
cmdCreateTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", 1, "number of partitions")
cmdCreateTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", -1, "replication factor")
cmdCreateTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")
cmdCreateTopic.Flags().StringVarP(&flags.File, "file", "f", "", "file with topic description")
cmdCreateTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`")

return cmdCreateTopic
Expand Down
48 changes: 48 additions & 0 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package topic

import (
"encoding/json"
"fmt"
"os"
"path"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -53,9 +56,26 @@ type CreateTopicFlags struct {
Partitions int32
ReplicationFactor int16
ValidateOnly bool
File string
Configs []string
}

type CreateTopicConfig struct {
Name string `json:"Name"`
Partitions []struct {
ID int `json:"ID"`
OldestOffset int `json:"oldestOffset"`
NewestOffset int `json:"newestOffset"`
Leader string `json:"Leader"`
Replicas []int `json:"Replicas"`
InSyncReplicas []int `json:"inSyncReplicas"`
} `json:"Partitions"`
Configs []struct {
Name string `json:"Name"`
Value string `json:"Value"`
} `json:"Configs"`
}

type AlterTopicFlags struct {
Partitions int32
ReplicationFactor int16
Expand Down Expand Up @@ -111,6 +131,34 @@ func (operation *Operation) CreateTopics(topics []string, flags CreateTopicFlags
topicDetails.ConfigEntries[configParts[0]] = &configParts[1]
}

if flags.File != "" {
fileContent, err := os.ReadFile(flags.File)
if err != nil {
return errors.Wrap(err, "could not read topic description file")
}

createTopicConfig := CreateTopicConfig{}
ext := path.Ext(flags.File)
var unmarshalErr error
switch ext {
case ".yml", ".yaml":
unmarshalErr = yaml.Unmarshal(fileContent, &createTopicConfig)
case ".json":
unmarshalErr = json.Unmarshal(fileContent, &createTopicConfig)
default:
return errors.Wrapf(err, "unsupported file format '%s'", ext)
}
if unmarshalErr != nil {
return errors.Wrap(err, "could not umarshal config file")
}

topicDetails.NumPartitions = int32(len(createTopicConfig.Partitions))
topicDetails.ReplicationFactor = int16(len(createTopicConfig.Partitions[0].Replicas))
for _, v := range createTopicConfig.Configs {
topicDetails.ConfigEntries[v.Name] = &v.Value
}
}

for _, topic := range topics {
if err = admin.CreateTopic(topic, &topicDetails, flags.ValidateOnly); err != nil {
return errors.Wrap(err, "failed to create topic")
Expand Down

0 comments on commit 72dd004

Please sign in to comment.