diff --git a/cmd/kaf/topic.go b/cmd/kaf/topic.go index 2857916..5227ee6 100644 --- a/cmd/kaf/topic.go +++ b/cmd/kaf/topic.go @@ -55,7 +55,7 @@ var topicsCmd = &cobra.Command{ var topicSetConfig = &cobra.Command{ Use: "set-config", - Short: "set topic config", + Short: "set topic config. requires Kafka >=2.3.0 on broker side and kaf cluster config.", Example: "kaf topic set-config topic.name \"cleanup.policy=delete\"", Args: cobra.ExactArgs(2), Run: func(cmd *cobra.Command, args []string) { @@ -64,7 +64,7 @@ var topicSetConfig = &cobra.Command{ topic := args[0] splt := strings.Split(args[1], ",") - configs := make(map[string]*string) + configs := make(map[string]sarama.IncrementalAlterConfigsEntry) for _, kv := range splt { s := strings.Split(kv, "=") @@ -75,14 +75,17 @@ var topicSetConfig = &cobra.Command{ key := s[0] value := s[1] - configs[key] = &value + configs[key] = sarama.IncrementalAlterConfigsEntry{ + Operation: sarama.IncrementalAlterConfigsOperationSet, + Value: &value, + } } if len(configs) < 1 { errorExit("No valid configs found") } - err := admin.AlterConfig(sarama.TopicResource, topic, configs, false) + err := admin.IncrementalAlterConfig(sarama.TopicResource, topic, configs, false) if err != nil { errorExit("Unable to alter topic config: %v\n", err) }