Skip to content

Commit 7061fd2

Browse files
authored
DENA-671: Validate cleanup policy (#18)
* Validate cleanup policy * Added cleanup policy to docs * Do not export func
1 parent 18ccb31 commit 7061fd2

File tree

3 files changed

+139
-4
lines changed

3 files changed

+139
-4
lines changed

rules/msk_topic_config.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package rules
22

33
import (
44
"fmt"
5+
"slices"
6+
"strings"
57

68
"github.com/hashicorp/hcl/v2"
79
"github.com/hashicorp/hcl/v2/gohcl"
@@ -93,6 +95,10 @@ func (r *MSKTopicConfigRule) validateTopicConfig(runner tflint.Runner, topic *hc
9395
if err := r.validateCompressionType(runner, configAttr, configKeyToPairMap); err != nil {
9496
return err
9597
}
98+
99+
if err := r.validateCleanupPolicy(runner, configAttr, configKeyToPairMap); err != nil {
100+
return err
101+
}
96102
return nil
97103
}
98104

@@ -202,7 +208,7 @@ func (r *MSKTopicConfigRule) validateCompressionType(
202208
},
203209
)
204210
if err != nil {
205-
return fmt.Errorf("emitting issue with fix: no replication factor: %w", err)
211+
return fmt.Errorf("emitting issue with fix: no compression type: %w", err)
206212
}
207213
return nil
208214
}
@@ -229,3 +235,58 @@ func (r *MSKTopicConfigRule) validateCompressionType(
229235
}
230236
return nil
231237
}
238+
239+
const (
240+
cleanupPolicyKey = "cleanup.policy"
241+
cleanupPolicyDefault = "delete"
242+
)
243+
244+
var (
245+
cleanupPolicyDefaultFix = fmt.Sprintf(`"%s" = "%s"`, cleanupPolicyKey, cleanupPolicyDefault)
246+
cleanupPolicyValidValues = []string{"delete", "compact"}
247+
)
248+
249+
func (r *MSKTopicConfigRule) validateCleanupPolicy(
250+
runner tflint.Runner,
251+
config *hclext.Attribute,
252+
configKeyToPairMap map[string]hcl.KeyValuePair,
253+
) error {
254+
cpPair, hasCp := configKeyToPairMap[cleanupPolicyKey]
255+
if !hasCp {
256+
err := runner.EmitIssueWithFix(
257+
r,
258+
fmt.Sprintf("missing %s: using default '%s'", cleanupPolicyKey, cleanupPolicyDefault),
259+
config.Range,
260+
func(f tflint.Fixer) error {
261+
return f.InsertTextAfter(config.Expr.StartRange(), "\n"+cleanupPolicyDefaultFix)
262+
},
263+
)
264+
if err != nil {
265+
return fmt.Errorf("emitting issue with fix: no cleanup policy: %w", err)
266+
}
267+
return nil
268+
}
269+
270+
var cpVal string
271+
diags := gohcl.DecodeExpression(cpPair.Value, nil, &cpVal)
272+
if diags.HasErrors() {
273+
return diags
274+
}
275+
if !slices.Contains(cleanupPolicyValidValues, cpVal) {
276+
err := runner.EmitIssue(
277+
r,
278+
fmt.Sprintf(
279+
"invalid %s: it must be one of [%s], but currently is '%s'",
280+
cleanupPolicyKey,
281+
strings.Join(cleanupPolicyValidValues, ", "),
282+
cpVal,
283+
),
284+
cpPair.Value.Range(),
285+
)
286+
if err != nil {
287+
return fmt.Errorf("emitting issue: invalid cleanup policy: %w", err)
288+
}
289+
return nil
290+
}
291+
return nil
292+
}

rules/msk_topic_config.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
An MSK topic configuration must comply with the following rules:
66
- the replication factor must be equal to 3, because we are deploying across 3 availability zones and this is the minimum we can run, since min-in-sync replicas is set to 2.
77
- the 'compression.type' must always be set to `zstd`. This is a very good compression algorithm, and it is set by default for the producer in our [kafka lib](https://github.com/utilitywarehouse/uwos-go/tree/main/pubsub/kafka)
8+
- the 'cleanup.policy' must be specified and must be one of 'delete' or 'compact'. If not specified, it is set automatically on 'delete'. See [kafka spec](https://kafka.apache.org/30/generated/topic_config.html#topicconfigs_cleanup.policy)
89

910
## Example
1011

@@ -16,6 +17,7 @@ resource "kafka_topic" "good_topic" {
1617
replication_factor = 3
1718
config = {
1819
"compression.type" = "zstd"
20+
"cleanup.policy" = "delete"
1921
}
2022
}
2123
@@ -36,6 +38,14 @@ resource "kafka_topic" "topic_with_wrong_compression_type" {
3638
"compression.type" = "gzip"
3739
}
3840
}
41+
42+
# topic with cleanup policy
43+
resource "kafka_topic" "topic_with_wrong_cleanup_policy" {
44+
name = "wrong-topic"
45+
config = {
46+
"cleanup.policy" = "invalid-value"
47+
}
48+
}
3949
```
4050

4151
## How To Fix

rules/msk_topic_config_test.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func Test_MSKTopicConfigRule(t *testing.T) {
2525
resource "kafka_topic" "topic_without_repl_factor_and_name" {
2626
config = {
2727
"compression.type" = "zstd"
28+
"cleanup.policy" = "delete"
2829
}
2930
}`,
3031
expected: []*helper.Issue{
@@ -47,6 +48,7 @@ resource "kafka_topic" "topic_without_repl_factor" {
4748
name = "topic_without_repl_factor"
4849
config = {
4950
"compression.type" = "zstd"
51+
"cleanup.policy" = "delete"
5052
}
5153
}`,
5254
fixed: `
@@ -55,6 +57,7 @@ resource "kafka_topic" "topic_without_repl_factor" {
5557
replication_factor = 3
5658
config = {
5759
"compression.type" = "zstd"
60+
"cleanup.policy" = "delete"
5861
}
5962
}`,
6063
expected: []*helper.Issue{
@@ -77,6 +80,7 @@ resource "kafka_topic" "topic_with_incorrect_repl_factor" {
7780
replication_factor = 10
7881
config = {
7982
"compression.type" = "zstd"
83+
"cleanup.policy" = "delete"
8084
}
8185
}`,
8286
fixed: `
@@ -85,6 +89,7 @@ resource "kafka_topic" "topic_with_incorrect_repl_factor" {
8589
replication_factor = 3
8690
config = {
8791
"compression.type" = "zstd"
92+
"cleanup.policy" = "delete"
8893
}
8994
}`,
9095
expected: []*helper.Issue{
@@ -125,6 +130,7 @@ resource "kafka_topic" "topic_without_compression_type" {
125130
name = "topic_without_compression_type"
126131
replication_factor = 3
127132
config = {
133+
"cleanup.policy" = "delete"
128134
}
129135
}`,
130136
fixed: `
@@ -133,6 +139,7 @@ resource "kafka_topic" "topic_without_compression_type" {
133139
replication_factor = 3
134140
config = {
135141
"compression.type" = "zstd"
142+
"cleanup.policy" = "delete"
136143
}
137144
}`,
138145
expected: []*helper.Issue{
@@ -142,7 +149,7 @@ resource "kafka_topic" "topic_without_compression_type" {
142149
Range: hcl.Range{
143150
Filename: fileName,
144151
Start: hcl.Pos{Line: 5, Column: 3},
145-
End: hcl.Pos{Line: 6, Column: 4},
152+
End: hcl.Pos{Line: 7, Column: 4},
146153
},
147154
},
148155
},
@@ -154,6 +161,7 @@ resource "kafka_topic" "topic_with_wrong_compression_type" {
154161
name = "topic_with_wrong_compression_type"
155162
replication_factor = 3
156163
config = {
164+
"cleanup.policy" = "delete"
157165
"compression.type" = "gzip"
158166
}
159167
}`,
@@ -162,6 +170,7 @@ resource "kafka_topic" "topic_with_wrong_compression_type" {
162170
name = "topic_with_wrong_compression_type"
163171
replication_factor = 3
164172
config = {
173+
"cleanup.policy" = "delete"
165174
"compression.type" = "zstd"
166175
}
167176
}`,
@@ -171,8 +180,62 @@ resource "kafka_topic" "topic_with_wrong_compression_type" {
171180
Message: "the compression.type value must be equal to 'zstd'",
172181
Range: hcl.Range{
173182
Filename: fileName,
174-
Start: hcl.Pos{Line: 6, Column: 5},
175-
End: hcl.Pos{Line: 6, Column: 23},
183+
Start: hcl.Pos{Line: 7, Column: 5},
184+
End: hcl.Pos{Line: 7, Column: 23},
185+
},
186+
},
187+
},
188+
},
189+
{
190+
name: "missing cleanup policy",
191+
input: `
192+
resource "kafka_topic" "topic_without_cleanup_policy" {
193+
name = "topic_without_cleanup_policy"
194+
replication_factor = 3
195+
config = {
196+
"compression.type" = "zstd"
197+
}
198+
}`,
199+
fixed: `
200+
resource "kafka_topic" "topic_without_cleanup_policy" {
201+
name = "topic_without_cleanup_policy"
202+
replication_factor = 3
203+
config = {
204+
"cleanup.policy" = "delete"
205+
"compression.type" = "zstd"
206+
}
207+
}`,
208+
expected: []*helper.Issue{
209+
{
210+
Rule: rule,
211+
Message: "missing cleanup.policy: using default 'delete'",
212+
Range: hcl.Range{
213+
Filename: fileName,
214+
Start: hcl.Pos{Line: 5, Column: 3},
215+
End: hcl.Pos{Line: 7, Column: 4},
216+
},
217+
},
218+
},
219+
},
220+
{
221+
name: "invalid cleanup policy value",
222+
input: `
223+
resource "kafka_topic" "topic_with_invalid_cleanup_policy" {
224+
name = "topic_with_invalid_cleanup_policy"
225+
replication_factor = 3
226+
config = {
227+
"cleanup.policy" = "invalid-value"
228+
"compression.type" = "zstd"
229+
}
230+
}`,
231+
expected: []*helper.Issue{
232+
{
233+
Rule: rule,
234+
Message: "invalid cleanup.policy: it must be one of [delete, compact], but currently is 'invalid-value'",
235+
Range: hcl.Range{
236+
Filename: fileName,
237+
Start: hcl.Pos{Line: 6, Column: 26},
238+
End: hcl.Pos{Line: 6, Column: 41},
176239
},
177240
},
178241
},
@@ -184,6 +247,7 @@ resource "kafka_topic" "good topic" {
184247
name = "good_topic"
185248
replication_factor = 3
186249
config = {
250+
"cleanup.policy" = "delete"
187251
"compression.type" = "zstd"
188252
}
189253
}`,

0 commit comments

Comments
 (0)