Skip to content

Commit f2cbb8e

Browse files
DENA-671: validate replication factor (#12)
* Implemented the validation for the replication_factor attribute. * Added check for name * Moved the topic name attr check in the topic name rule * Started the definition for the topic config rule. * Using the constant Co-authored-by: Matt Hughes <[email protected]> * Added link to the used value * Updated scenario name * Fixed test * Updated comment * Extracted method. --------- Co-authored-by: Matt Hughes <[email protected]>
1 parent 8afe5e9 commit f2cbb8e

File tree

7 files changed

+317
-1
lines changed

7 files changed

+317
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ plugin "uw-kafka-config" {
2222
| [`msk_module_backend`](rules/msk_module_backend.md) | Requires an S3 backend to be defined, with a key that has as suffix the name of the team (taken from the current directory name) |
2323
| [`msk_app_topics`](rules/msk_app_topics.md) | Requires apps consume from and produce to only topics define in their module. |
2424
| [`msk_topic_name`](rules/msk_topic_name.md) | Requires defined topics in a module to belong to that team. |
25+
| [`msk_topic_config`](rules/msk_topic_config.md) | Checks the configuration for MSK topics |
2526

2627

2728
## Building the plugin

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func main() {
1919
rules.NewMskModuleBackendRule(),
2020
&rules.MskAppTopics{},
2121
&rules.MskTopicNameRule{},
22+
&rules.MskTopicConfigRule{},
2223
},
2324
},
2425
})

rules/msk_topic_config.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package rules
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/hashicorp/hcl/v2/gohcl"
7+
"github.com/terraform-linters/tflint-plugin-sdk/hclext"
8+
"github.com/terraform-linters/tflint-plugin-sdk/logger"
9+
"github.com/terraform-linters/tflint-plugin-sdk/tflint"
10+
)
11+
12+
// MskTopicConfigRule checks the configuration for an MSK topic.
13+
type MskTopicConfigRule struct {
14+
tflint.DefaultRule
15+
}
16+
17+
func (r *MskTopicConfigRule) Name() string {
18+
return "msk_topic_config"
19+
}
20+
21+
func (r *MskTopicConfigRule) Enabled() bool {
22+
return true
23+
}
24+
25+
func (r *MskTopicConfigRule) Link() string {
26+
return ReferenceLink(r.Name())
27+
}
28+
29+
func (r *MskTopicConfigRule) Severity() tflint.Severity {
30+
return tflint.ERROR
31+
}
32+
33+
func (r *MskTopicConfigRule) Check(runner tflint.Runner) error {
34+
path, err := runner.GetModulePath()
35+
if err != nil {
36+
return fmt.Errorf("getting module path: %w", err)
37+
}
38+
if !path.IsRoot() {
39+
logger.Debug("skipping child module")
40+
return nil
41+
}
42+
43+
resourceContents, err := runner.GetResourceContent(
44+
"kafka_topic",
45+
&hclext.BodySchema{
46+
Attributes: []hclext.AttributeSchema{
47+
{Name: "name"},
48+
{Name: "replication_factor"},
49+
},
50+
Blocks: []hclext.BlockSchema{
51+
{
52+
Type: "config",
53+
Body: &hclext.BodySchema{
54+
Attributes: []hclext.AttributeSchema{
55+
{Name: "retention.ms"},
56+
{Name: "compression.type"},
57+
{Name: "cleanup.policy"},
58+
},
59+
},
60+
},
61+
},
62+
},
63+
nil,
64+
)
65+
if err != nil {
66+
return fmt.Errorf("getting kafka_topic contents: %w", err)
67+
}
68+
69+
for _, topicResource := range resourceContents.Blocks {
70+
if err := r.validateTopicConfig(runner, topicResource); err != nil {
71+
return err
72+
}
73+
}
74+
75+
return nil
76+
}
77+
78+
func (r *MskTopicConfigRule) validateTopicConfig(runner tflint.Runner, topic *hclext.Block) error {
79+
if err := r.validateReplicationFactor(runner, topic); err != nil {
80+
return err
81+
}
82+
83+
return nil
84+
}
85+
86+
const (
87+
replFactorAttrName = "replication_factor"
88+
// See [https://github.com/utilitywarehouse/tflint-ruleset-kafka-config/blob/main/rules/msk_topic_config.md#requirements] for explanation.
89+
replicationFactorVal = 3
90+
)
91+
92+
var replFactorFix = fmt.Sprintf("%s = %d", replFactorAttrName, replicationFactorVal)
93+
94+
func (r *MskTopicConfigRule) validateReplicationFactor(runner tflint.Runner, topic *hclext.Block) error {
95+
replFactorAttr, hasReplFactor := topic.Body.Attributes[replFactorAttrName]
96+
if !hasReplFactor {
97+
return r.reportMissingReplicationFactor(runner, topic)
98+
}
99+
100+
var replFactor int
101+
diags := gohcl.DecodeExpression(replFactorAttr.Expr, nil, &replFactor)
102+
if diags.HasErrors() {
103+
return diags
104+
}
105+
106+
if replFactor != replicationFactorVal {
107+
err := runner.EmitIssueWithFix(
108+
r,
109+
fmt.Sprintf("the replication_factor must be equal to '%d'", replicationFactorVal),
110+
replFactorAttr.Range,
111+
func(f tflint.Fixer) error {
112+
return f.ReplaceText(replFactorAttr.Range, replFactorFix)
113+
},
114+
)
115+
if err != nil {
116+
return fmt.Errorf("emitting issue: incorrect replication factor: %w", err)
117+
}
118+
}
119+
return nil
120+
}
121+
122+
func (r *MskTopicConfigRule) reportMissingReplicationFactor(runner tflint.Runner, topic *hclext.Block) error {
123+
nameAttr, hasName := topic.Body.Attributes["name"]
124+
if !hasName {
125+
/* when no name attribute, we can not issue a fix, as we insert the replication factor after the name */
126+
err := runner.EmitIssue(
127+
r,
128+
fmt.Sprintf("missing replication_factor: it must be equal to '%d'", replicationFactorVal),
129+
topic.DefRange,
130+
)
131+
if err != nil {
132+
return fmt.Errorf("emitting issue without fix: no replication factor: %w", err)
133+
}
134+
return nil
135+
}
136+
137+
err := runner.EmitIssueWithFix(
138+
r,
139+
fmt.Sprintf("missing replication_factor: it must be equal to '%d'", replicationFactorVal),
140+
topic.DefRange,
141+
func(f tflint.Fixer) error {
142+
return f.InsertTextAfter(nameAttr.Range, "\n"+replFactorFix)
143+
},
144+
)
145+
if err != nil {
146+
return fmt.Errorf("emitting issue with fix: no replication factor: %w", err)
147+
}
148+
return nil
149+
}

rules/msk_topic_config.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# msk_topic_config
2+
3+
## Requirements
4+
5+
An MSK topic configuration must comply with the following rules:
6+
- the replication factor must be equal to 3, because we are deploying across 3 availability zones and this is the minimum we can run, since min-in-sync replicas is set to 2.
7+
8+
## Example
9+
10+
### Good example
11+
12+
```hcl
13+
resource "kafka_topic" "good_topic" {
14+
name = "pubsub.good-topic"
15+
replication_factor = 3
16+
}
17+
18+
```
19+
20+
### Bad examples
21+
```hcl
22+
# topic with wrong replication factor value
23+
resource "kafka_topic" "topic_with_wrong_replication_factor" {
24+
name = "wrong-topic-1"
25+
replication_factor = 6
26+
}
27+
```
28+
29+
## How To Fix
30+
31+
Define the topic satisfying the [requirements](#requirements).
32+
33+
See [good example](#good-example)

rules/msk_topic_config_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package rules
2+
3+
import (
4+
"testing"
5+
6+
"github.com/hashicorp/hcl/v2"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
"github.com/terraform-linters/tflint-plugin-sdk/helper"
10+
)
11+
12+
func Test_MskTopicConfigRule(t *testing.T) {
13+
rule := &MskTopicConfigRule{}
14+
15+
const fileName = "topics.tf"
16+
for _, tc := range []struct {
17+
name string
18+
input string
19+
fixed string
20+
expected helper.Issues
21+
}{
22+
{
23+
name: "missing replication factor and topic name not defined",
24+
input: `
25+
resource "kafka_topic" "topic_without_repl_factor_and_name" {
26+
}`,
27+
expected: []*helper.Issue{
28+
{
29+
Rule: rule,
30+
Message: "missing replication_factor: it must be equal to '3'",
31+
Range: hcl.Range{
32+
Filename: fileName,
33+
Start: hcl.Pos{Line: 2, Column: 1},
34+
End: hcl.Pos{Line: 2, Column: 60},
35+
},
36+
},
37+
},
38+
},
39+
40+
{
41+
name: "missing replication factor",
42+
input: `
43+
resource "kafka_topic" "topic_without_repl_factor" {
44+
name = "topic_without_repl_factor"
45+
}`,
46+
fixed: `
47+
resource "kafka_topic" "topic_without_repl_factor" {
48+
name = "topic_without_repl_factor"
49+
replication_factor = 3
50+
}`,
51+
expected: []*helper.Issue{
52+
{
53+
Rule: rule,
54+
Message: "missing replication_factor: it must be equal to '3'",
55+
Range: hcl.Range{
56+
Filename: fileName,
57+
Start: hcl.Pos{Line: 2, Column: 1},
58+
End: hcl.Pos{Line: 2, Column: 51},
59+
},
60+
},
61+
},
62+
},
63+
{
64+
name: "incorrect replication factor",
65+
input: `
66+
resource "kafka_topic" "topic_with_incorrect_repl_factor" {
67+
name = "topic_with_incorrect_repl_factor"
68+
replication_factor = 10
69+
}`,
70+
fixed: `
71+
resource "kafka_topic" "topic_with_incorrect_repl_factor" {
72+
name = "topic_with_incorrect_repl_factor"
73+
replication_factor = 3
74+
}`,
75+
expected: []*helper.Issue{
76+
{
77+
Rule: rule,
78+
Message: "the replication_factor must be equal to '3'",
79+
Range: hcl.Range{
80+
Filename: fileName,
81+
Start: hcl.Pos{Line: 4, Column: 3},
82+
End: hcl.Pos{Line: 4, Column: 26},
83+
},
84+
},
85+
},
86+
},
87+
} {
88+
t.Run(tc.name, func(t *testing.T) {
89+
runner := helper.TestRunner(t, map[string]string{fileName: tc.input})
90+
require.NoError(t, rule.Check(runner))
91+
helper.AssertIssues(t, tc.expected, runner.Issues)
92+
93+
if tc.fixed != "" {
94+
helper.AssertChanges(t, map[string]string{fileName: tc.fixed}, runner.Changes())
95+
} else {
96+
assert.Empty(t, runner.Changes())
97+
}
98+
})
99+
}
100+
}

rules/msk_topic_name.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,18 @@ func (r *MskTopicNameRule) validateTopicName(
8787
aliases map[string][]string,
8888
) error {
8989
resourceName := topic.Labels[1]
90-
nameAttr := topic.Body.Attributes["name"]
90+
nameAttr, hasName := topic.Body.Attributes["name"]
91+
if !hasName {
92+
err := runner.EmitIssue(
93+
r,
94+
fmt.Sprintf("topic resource '%s' must have the name defined", resourceName),
95+
topic.DefRange,
96+
)
97+
if err != nil {
98+
return fmt.Errorf("emitting issue: no name: %w", err)
99+
}
100+
return nil
101+
}
91102

92103
var topicName string
93104
diags := gohcl.DecodeExpression(nameAttr.Expr, nil, &topicName)

rules/msk_topic_name_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,27 @@ func Test_MskTopics(t *testing.T) {
1818
workDir string
1919
expected helper.Issues
2020
}{
21+
{
22+
name: "topic doesn't have a name",
23+
workDir: filepath.Join("kafka-cluster-config", "dev-aws", "kafka-shared-msk", "pubsub"),
24+
files: map[string]string{
25+
"topics.tf": `
26+
resource "kafka_topic" "topic_without_name" {
27+
}
28+
`,
29+
},
30+
expected: []*helper.Issue{
31+
{
32+
Rule: rule,
33+
Message: "topic resource 'topic_without_name' must have the name defined",
34+
Range: hcl.Range{
35+
Filename: "topics.tf",
36+
Start: hcl.Pos{Line: 2, Column: 1},
37+
End: hcl.Pos{Line: 2, Column: 44},
38+
},
39+
},
40+
},
41+
},
2142
{
2243
name: "topic doesn't contain the team prefix",
2344
workDir: filepath.Join("kafka-cluster-config", "dev-aws", "kafka-shared-msk", "pubsub"),

0 commit comments

Comments
 (0)