Skip to content

Commit 126f15e

Browse files
author
Jerome Revillard
committed
Schema Registry management only if registry url provided
1 parent 99f0df3 commit 126f15e

File tree

6 files changed

+25
-35
lines changed

6 files changed

+25
-35
lines changed

src/main/java/com/devshawn/kafka/gitops/StateManager.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ public DesiredStateFile getAndValidateStateFile() {
7878
DesiredStateFile desiredStateFile = parserService.parseStateFile();
7979
validateTopics(desiredStateFile);
8080
validateCustomAcls(desiredStateFile);
81-
validateSchemas(desiredStateFile);
81+
if (schemaRegistryService.isEnabled()) {
82+
validateSchemas(desiredStateFile);
83+
}
8284
this.describeAclEnabled = StateUtil.isDescribeTopicAclEnabled(desiredStateFile);
8385
return desiredStateFile;
8486
}
@@ -97,7 +99,9 @@ private DesiredPlan generatePlan() {
9799
planManager.planAcls(desiredState, desiredPlan);
98100
}
99101
planManager.planTopics(desiredState, desiredPlan);
100-
planManager.planSchemas(desiredState, desiredPlan);
102+
if (schemaRegistryService.isEnabled()) {
103+
planManager.planSchemas(desiredState, desiredPlan);
104+
}
101105
return desiredPlan.build();
102106
}
103107

@@ -113,7 +117,9 @@ public DesiredPlan apply() {
113117
if (!managerConfig.isSkipAclsDisabled()) {
114118
applyManager.applyAcls(desiredPlan);
115119
}
116-
applyManager.applySchemas(desiredPlan);
120+
if (schemaRegistryService.isEnabled()) {
121+
applyManager.applySchemas(desiredPlan);
122+
}
117123

118124
return desiredPlan;
119125
}

src/main/java/com/devshawn/kafka/gitops/config/SchemaRegistryConfigLoader.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package com.devshawn.kafka.gitops.config;
22

3-
import org.slf4j.LoggerFactory;
4-
53
import java.util.HashMap;
64
import java.util.Map;
7-
import java.util.concurrent.atomic.AtomicReference;
5+
import org.slf4j.LoggerFactory;
86

97
public class SchemaRegistryConfigLoader {
108

@@ -45,12 +43,7 @@ private static void setConfig(SchemaRegistryConfig.Builder builder) {
4543
}
4644

4745
private static void handleDefaultConfig(Map<String, Object> config) {
48-
final String DEFAULT_URL = "http://localhost:8081";
4946
final String CURRENT_WORKING_DIR = System.getProperty("user.dir");
50-
if (!config.containsKey(SCHEMA_REGISTRY_URL_KEY)) {
51-
log.info("{} not set. Using default value of {}", SCHEMA_REGISTRY_URL_KEY, DEFAULT_URL);
52-
config.put(SCHEMA_REGISTRY_URL_KEY, DEFAULT_URL);
53-
}
5447
if (!config.containsKey(SCHEMA_DIRECTORY_KEY)) {
5548
log.info("{} not set. Defaulting to current working directory: {}", SCHEMA_DIRECTORY_KEY, CURRENT_WORKING_DIR);
5649
config.put(SCHEMA_DIRECTORY_KEY, CURRENT_WORKING_DIR);

src/main/java/com/devshawn/kafka/gitops/service/SchemaRegistryService.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.ArrayList;
55
import java.util.Collections;
66
import java.util.List;
7-
import java.util.Map;
87
import java.util.Optional;
98
import com.devshawn.kafka.gitops.config.SchemaRegistryConfig;
109
import com.devshawn.kafka.gitops.config.SchemaRegistryConfigLoader;
@@ -34,8 +33,8 @@ public SchemaRegistryService(SchemaRegistryConfig config) {
3433
this.config = config;
3534
}
3635

37-
public Map<String, Object> getConfig() {
38-
return Collections.unmodifiableMap(config.getConfig());
36+
public boolean isEnabled() {
37+
return this.config.getConfig().containsKey(SchemaRegistryConfigLoader.SCHEMA_REGISTRY_URL_KEY);
3938
}
4039

4140
public List<String> getAllSubjects() {

src/test/groovy/com/devshawn/kafka/gitops/ApplyCommandIntegrationSpec.groovy

+1-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ class ApplyCommandIntegrationSpec extends Specification {
6464
"custom-group-id-connect",
6565
"custom-application-id-streams",
6666
"custom-storage-topic",
67-
"custom-storage-topics",
68-
"schema-registry-simple"
67+
"custom-storage-topics"
6968
]
7069
}
7170

src/test/groovy/com/devshawn/kafka/gitops/PlanCommandIntegrationSpec.groovy

+3
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class PlanCommandIntegrationSpec extends Specification {
105105

106106
void 'test various valid plans with seed - #planName'() {
107107
setup:
108+
TestUtils.cleanUpKafkaCluster()
108109
TestUtils.seedKafkaCluster()
109110
String planOutputFile = "/tmp/plan.json"
110111
String file = TestUtils.getResourceFilePath("plans/${planName}.yaml")
@@ -144,6 +145,7 @@ class PlanCommandIntegrationSpec extends Specification {
144145

145146
void 'test include unchanged flag - #planNam #includeUnchanged'() {
146147
setup:
148+
TestUtils.cleanUpKafkaCluster()
147149
TestUtils.seedKafkaCluster()
148150
String planOutputFile = "/tmp/plan.json"
149151
String file = TestUtils.getResourceFilePath("plans/${planName}.yaml")
@@ -260,6 +262,7 @@ class PlanCommandIntegrationSpec extends Specification {
260262

261263
void 'test plan that has no changes - #includeUnchanged'() {
262264
setup:
265+
TestUtils.cleanUpKafkaCluster()
263266
TestUtils.seedKafkaCluster()
264267
ByteArrayOutputStream out = new ByteArrayOutputStream()
265268
PrintStream oldOut = System.out

src/test/groovy/com/devshawn/kafka/gitops/TestUtils.groovy

+9-19
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ import java.nio.file.Paths
44
import org.apache.kafka.clients.CommonClientConfigs
55
import org.apache.kafka.clients.admin.AdminClient
66
import org.apache.kafka.clients.admin.NewTopic
7-
import org.apache.kafka.common.acl.*
7+
import org.apache.kafka.common.acl.AccessControlEntry;
8+
import org.apache.kafka.common.acl.AccessControlEntryFilter;
9+
import org.apache.kafka.common.acl.AclBinding;
10+
import org.apache.kafka.common.acl.AclBindingFilter;
11+
import org.apache.kafka.common.acl.AclOperation;
12+
import org.apache.kafka.common.acl.AclPermissionType;
813
import org.apache.kafka.common.config.SaslConfigs
914
import org.apache.kafka.common.resource.PatternType
1015
import org.apache.kafka.common.resource.ResourcePattern
@@ -13,19 +18,12 @@ import org.apache.kafka.common.resource.ResourceType
1318
import com.devshawn.kafka.gitops.config.SchemaRegistryConfigLoader
1419
import com.devshawn.kafka.gitops.enums.SchemaCompatibility
1520
import com.devshawn.kafka.gitops.enums.SchemaType
16-
import com.devshawn.kafka.gitops.exception.ValidationException
1721
import com.devshawn.kafka.gitops.service.SchemaRegistryService
18-
import groovy.swing.factory.CollectionFactory
1922
import io.confluent.kafka.schemaregistry.AbstractSchemaProvider
2023
import io.confluent.kafka.schemaregistry.ParsedSchema
21-
import io.confluent.kafka.schemaregistry.SchemaProvider
22-
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider
2324
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
2425
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
2526
import io.confluent.kafka.schemaregistry.client.rest.RestService
26-
import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider
27-
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider
28-
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
2927
import spock.util.concurrent.PollingConditions
3028

3129
class TestUtils {
@@ -196,16 +194,8 @@ class TestUtils {
196194
}
197195

198196
static CachedSchemaRegistryClient getSchemaRegistryClient() {
199-
Map<String, Object> config = SchemaRegistryConfigLoader.load().getConfig();
200-
RestService restService = new RestService(config.get(SchemaRegistryConfigLoader.SCHEMA_REGISTRY_URL_KEY).toString())
201-
if(config.get(SchemaRegistryConfigLoader.SCHEMA_REGISTRY_SASL_CONFIG_KEY) != null) {
202-
SaslBasicAuthCredentialProvider saslBasicAuthCredentialProvider = new SaslBasicAuthCredentialProvider()
203-
Map<String, Object> clientConfig = new HashMap<>()
204-
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, config
205-
.get(SchemaRegistryConfigLoader.SCHEMA_REGISTRY_SASL_CONFIG_KEY).toString())
206-
saslBasicAuthCredentialProvider.configure(clientConfig)
207-
restService.setBasicAuthCredentialProvider(saslBasicAuthCredentialProvider)
208-
}
209-
return new CachedSchemaRegistryClient(restService, 10);
197+
Map<String, Object> config = SchemaRegistryConfigLoader.load().getConfig();
198+
RestService restService = new RestService(config.get(SchemaRegistryConfigLoader.SCHEMA_REGISTRY_URL_KEY).toString())
199+
return new CachedSchemaRegistryClient(restService, 10, config);
210200
}
211201
}

0 commit comments

Comments
 (0)