Skip to content

Commit 7749e13

Browse files
author
Jerome Revillard
committed
Fix Schema Registry usage with references
1 parent a98d7c6 commit 7749e13

File tree

46 files changed

+539
-244
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+539
-244
lines changed

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ RUN apt-get update && apt-get --yes upgrade && \
88
apt-get install -y python3 python3-pip curl && \
99
rm -rf /var/lib/apt/lists/*
1010

11-
COPY --from=build /home/gradle/src/build/output/kafka-gitops /usr/local/bin/kafka-gitops
11+
COPY --from=build /home/gradle/src/build/output/kafka-gitops /usr/local/bin/kafka-gitops

build.gradle

+22-22
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,33 @@ repositories {
2222
maven {
2323
url "https://packages.confluent.io/maven/"
2424
}
25+
maven {
26+
url "https://jitpack.io"
27+
}
2528
}
2629

2730
dependencies {
28-
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0'
29-
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.1'
30-
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.8"
31-
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.2"
32-
compile 'info.picocli:picocli:4.1.4'
33-
34-
implementation ('io.confluent:kafka-schema-registry-client:6.1.1')
35-
<<<<<<< HEAD
36-
implementation('com.flipkart.zjsonpatch:zjsonpatch:0.4.11')
37-
=======
38-
implementation ('io.confluent:kafka-json-schema-provider:6.1.1')
39-
implementation ('io.confluent:kafka-protobuf-serializer:6.1.1')
40-
>>>>>>> a208542 (Tests)
41-
42-
compile 'org.slf4j:slf4j-api:1.7.30'
43-
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
44-
compile group: 'ch.qos.logback', name: 'logback-core', version: '1.2.3'
31+
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.1'
32+
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.13.2.2'
33+
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2"
34+
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.13.2"
35+
compile 'info.picocli:picocli:4.6.3'
36+
37+
implementation ('io.confluent:kafka-schema-registry-client:7.1.1')
38+
implementation ('io.confluent:kafka-json-schema-provider:7.1.1')
39+
implementation ('io.confluent:kafka-protobuf-serializer:7.1.1')
40+
implementation ('io.github.java-diff-utils:java-diff-utils:4.11')
41+
42+
compile 'org.slf4j:slf4j-api:1.7.36'
43+
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.11'
44+
compile group: 'ch.qos.logback', name: 'logback-core', version: '1.2.11'
4545

4646
processor 'org.inferred:freebuilder:2.7.0'
4747

48-
testCompile group: 'junit', name: 'junit', version: '4.12'
49-
testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.5.14'
50-
testCompile group: 'org.spockframework', name: 'spock-core', version: '1.2-groovy-2.5'
51-
testCompile group: 'cglib', name: 'cglib-nodep', version: '2.2'
48+
testCompile group: 'junit', name: 'junit', version: '4.13.2'
49+
testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.5.16'
50+
testCompile group: 'org.spockframework', name: 'spock-core', version: '1.3-groovy-2.5'
51+
testCompile group: 'cglib', name: 'cglib-nodep', version: '2.2.2'
5252
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'
5353
testCompile group: 'org.skyscreamer', name: 'jsonassert', version: '1.5.0'
5454
}
@@ -88,4 +88,4 @@ task buildRelease(type: Zip, group: "build") {
8888
}
8989

9090
buildRelease.dependsOn buildExecutableJar
91-
buildExecutableJar.dependsOn shadowJar
91+
buildExecutableJar.dependsOn shadowJar

docker/config/registry_jaas.conf

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
KafkaClient {
2+
org.apache.kafka.common.security.plain.PlainLoginModule required
3+
username="test"
4+
password="test-secret";
5+
};

docker/docker-compose.yml

+36-18
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,33 @@
1-
version: '2.1'
1+
version: '3.3'
22

33
services:
44
zoo1:
5-
image: zookeeper:3.4.9
5+
image: zookeeper:3.8.0
66
hostname: zoo1
77
ports:
88
- "2181:2181"
9+
healthcheck:
10+
test: echo stat | nc localhost 2181
11+
interval: 10s
12+
timeout: 10s
13+
retries: 3
914
environment:
10-
ZOO_MY_ID: 1
11-
ZOO_PORT: 2181
12-
ZOO_SERVERS: server.1=zoo1:2888:3888
13-
volumes:
14-
- ./data/zoo1/data:/data
15-
- ./data/zoo1/datalog:/datalog
15+
- ZOOKEEPER_SERVER_ID=1
16+
- ZOOKEEPER_CLIENT_PORT=2181
17+
- ZOOKEEPER_TICK_TIME=2000
18+
- ZOOKEEPER_INIT_LIMIT=5
19+
- ZOOKEEPER_SYNC_LIMIT=2
20+
- ZOOKEEPER_SERVERS=zoo1:2888:3888
1621

1722
kafka1:
18-
image: confluentinc/cp-kafka:5.5.3
23+
image: confluentinc/cp-kafka:7.1.1
24+
user: "0:0"
1925
hostname: kafka1
2026
ports:
2127
- "9092:9092"
28+
restart: on-failure:3
29+
healthcheck:
30+
test: ps augwwx | egrep [S]upportedKafka
2231
environment:
2332
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
2433
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
@@ -27,24 +36,28 @@ services:
2736
KAFKA_BROKER_ID: 1
2837
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
2938
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
39+
KAFKA_DELETE_TOPIC_ENABLE: "true"
3040
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3141
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
3242
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
3343
ZOOKEEPER_SASL_ENABLED: "false"
34-
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
44+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
3545
KAFKA_SUPER_USERS: "User:test;User:kafka"
3646
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
3747
volumes:
38-
- ./data/kafka1/data:/var/lib/kafka/data
3948
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
4049
depends_on:
4150
- zoo1
4251

4352
kafka2:
44-
image: confluentinc/cp-kafka:5.5.3
53+
image: confluentinc/cp-kafka:7.1.1
54+
user: "0:0"
4555
hostname: kafka2
4656
ports:
4757
- "9093:9093"
58+
restart: on-failure:3
59+
healthcheck:
60+
test: ps augwwx | egrep [S]upportedKafka
4861
environment:
4962
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
5063
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
@@ -53,24 +66,28 @@ services:
5366
KAFKA_BROKER_ID: 2
5467
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
5568
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
69+
KAFKA_DELETE_TOPIC_ENABLE: "true"
5670
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
5771
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
5872
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
5973
ZOOKEEPER_SASL_ENABLED: "false"
60-
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
74+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
6175
KAFKA_SUPER_USERS: "User:test;User:kafka"
6276
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
6377
volumes:
64-
- ./data/kafka2/data:/var/lib/kafka/data
6578
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
6679
depends_on:
6780
- zoo1
6881

6982
kafka3:
70-
image: confluentinc/cp-kafka:5.5.3
83+
image: confluentinc/cp-kafka:7.1.1
84+
user: "0:0"
7185
hostname: kafka3
7286
ports:
7387
- "9094:9094"
88+
restart: on-failure:3
89+
healthcheck:
90+
test: ps augwwx | egrep [S]upportedKafka
7491
environment:
7592
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
7693
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
@@ -79,24 +96,25 @@ services:
7996
KAFKA_BROKER_ID: 3
8097
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
8198
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
99+
KAFKA_DELETE_TOPIC_ENABLE: "true"
82100
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
83101
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
84102
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
85103
ZOOKEEPER_SASL_ENABLED: "false"
86-
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
104+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
87105
KAFKA_SUPER_USERS: "User:test;User:kafka"
88106
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
89107
volumes:
90-
- ./data/kafka3/data:/var/lib/kafka/data
91108
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
92109
depends_on:
93110
- zoo1
94111

95112
schema-registry:
96-
image: confluentinc/cp-schema-registry:6.1.1
113+
image: confluentinc/cp-schema-registry:7.1.1
97114
hostname: schema-registry
98115
ports:
99116
- "8082:8082"
117+
restart: on-failure:5
100118
environment:
101119
SCHEMA_REGISTRY_HOST_NAME: schema-registry
102120
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka1:19092,kafka2:19092,kafka3:19092"

src/main/java/com/devshawn/kafka/gitops/MainCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,4 @@ public static void main(String[] args) {
8484
int exitCode = new CommandLine(new MainCommand()).execute(args);
8585
System.exit(exitCode);
8686
}
87-
}
87+
}

src/main/java/com/devshawn/kafka/gitops/StateManager.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ public DesiredStateFile getAndValidateStateFile() {
7878
DesiredStateFile desiredStateFile = parserService.parseStateFile();
7979
validateTopics(desiredStateFile);
8080
validateCustomAcls(desiredStateFile);
81-
validateSchemas(desiredStateFile);
81+
if (schemaRegistryService.isEnabled()) {
82+
validateSchemas(desiredStateFile);
83+
}
8284
this.describeAclEnabled = StateUtil.isDescribeTopicAclEnabled(desiredStateFile);
8385
return desiredStateFile;
8486
}
@@ -97,7 +99,9 @@ private DesiredPlan generatePlan() {
9799
planManager.planAcls(desiredState, desiredPlan);
98100
}
99101
planManager.planTopics(desiredState, desiredPlan);
100-
planManager.planSchemas(desiredState, desiredPlan);
102+
if (schemaRegistryService.isEnabled()) {
103+
planManager.planSchemas(desiredState, desiredPlan);
104+
}
101105
return desiredPlan.build();
102106
}
103107

@@ -113,7 +117,9 @@ public DesiredPlan apply() {
113117
if (!managerConfig.isSkipAclsDisabled()) {
114118
applyManager.applyAcls(desiredPlan);
115119
}
116-
applyManager.applySchemas(desiredPlan);
120+
if (schemaRegistryService.isEnabled()) {
121+
applyManager.applySchemas(desiredPlan);
122+
}
117123

118124
return desiredPlan;
119125
}
@@ -345,14 +351,12 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
345351

346352
private void validateSchemas(DesiredStateFile desiredStateFile) {
347353
Optional<SchemaCompatibility> defaultSchemaCompatibility = StateUtil.fetchDefaultSchemasCompatibility(desiredStateFile);
348-
if (!defaultSchemaCompatibility.isPresent()) {
349-
desiredStateFile.getSchemas().forEach((subject, details) -> {
350-
if (!details.getCompatibility().isPresent()) {
351-
throw new ValidationException(String.format("Not set: [compatibility] in state file definition: schema -> %s", subject));
352-
}
353-
schemaRegistryService.validateSchema(subject, details);
354-
});
355-
}
354+
desiredStateFile.getSchemas().forEach((subject, details) -> {
355+
if (!defaultSchemaCompatibility.isPresent() && !details.getCompatibility().isPresent()) {
356+
throw new ValidationException(String.format("Not set: [compatibility] in state file definition: schema -> %s", subject));
357+
}
358+
//Schema parsing is deferred in the PlanManager in order not to do it more than one time.
359+
});
356360
}
357361

358362
private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {

src/main/java/com/devshawn/kafka/gitops/cli/ApplyCommand.java

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public Integer call() {
4848
LogUtil.printKafkaExecutionError(ex, true);
4949
} catch (SchemaRegistryExecutionException ex) {
5050
LogUtil.printSchemaRegistryExecutionError(ex, true);
51+
} catch (Exception ex) {
52+
LogUtil.printGenericError(ex, true);
5153
}
5254
return 2;
5355
}

src/main/java/com/devshawn/kafka/gitops/cli/PlanCommand.java

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public Integer call() {
5252
LogUtil.printPlanOutputError(ex);
5353
} catch (SchemaRegistryExecutionException ex) {
5454
LogUtil.printSchemaRegistryExecutionError(ex);
55+
} catch (Exception ex) {
56+
LogUtil.printGenericError(ex, false);
5557
}
5658
return 2;
5759
}
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,62 @@
11
package com.devshawn.kafka.gitops.config;
22

3-
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
4-
import com.devshawn.kafka.gitops.exception.MissingMultipleConfigurationException;
5-
import org.slf4j.LoggerFactory;
6-
73
import java.util.HashMap;
84
import java.util.Map;
95
import java.util.concurrent.atomic.AtomicReference;
106

7+
import org.slf4j.LoggerFactory;
8+
119
public class SchemaRegistryConfigLoader {
1210

1311
private static org.slf4j.Logger log = LoggerFactory.getLogger(SchemaRegistryConfigLoader.class);
1412

1513
public static final String SCHEMA_REGISTRY_URL_KEY = "SCHEMA_REGISTRY_URL";
1614
public static final String SCHEMA_DIRECTORY_KEY = "SCHEMA_DIRECTORY";
17-
public static final String SCHEMA_REGISTRY_SASL_JAAS_USERNAME_KEY = "SCHEMA_REGISTRY_SASL_JAAS_USERNAME";
18-
public static final String SCHEMA_REGISTRY_SASL_JAAS_PASSWORD_KEY = "SCHEMA_REGISTRY_SASL_JAAS_PASSWORD";
19-
public static final String SCHEMA_REGISTRY_SASL_CONFIG_KEY = "SCHEMA_REGISTRY_SASL_CONFIG";
15+
public static final AtomicReference<SchemaRegistryConfig> instance = new AtomicReference<>();
2016

21-
private SchemaRegistryConfigLoader() {}
17+
private SchemaRegistryConfigLoader() {
18+
}
2219

2320
public static SchemaRegistryConfig load() {
24-
SchemaRegistryConfig.Builder builder = new SchemaRegistryConfig.Builder();
25-
setConfig(builder);
26-
return builder.build();
21+
return instance.updateAndGet(v -> {
22+
if (v != null) {
23+
return v;
24+
}
25+
SchemaRegistryConfig.Builder builder = new SchemaRegistryConfig.Builder();
26+
setConfig(builder);
27+
return builder.build();
28+
});
2729
}
2830

2931
private static void setConfig(SchemaRegistryConfig.Builder builder) {
3032
Map<String, Object> config = new HashMap<>();
31-
AtomicReference<String> username = new AtomicReference<>();
32-
AtomicReference<String> password = new AtomicReference<>();
3333

3434
Map<String, String> environment = System.getenv();
3535

3636
environment.forEach((key, value) -> {
37-
if (key.equals(SCHEMA_REGISTRY_SASL_JAAS_USERNAME_KEY)) {
38-
username.set(value);
39-
} else if (key.equals(SCHEMA_REGISTRY_SASL_JAAS_PASSWORD_KEY)) {
40-
password.set(value);
41-
} else if (key.equals(SCHEMA_REGISTRY_URL_KEY)) {
37+
if (key.equals(SCHEMA_REGISTRY_URL_KEY)) {
4238
config.put(SCHEMA_REGISTRY_URL_KEY, value);
4339
} else if (key.equals(SCHEMA_DIRECTORY_KEY)) {
4440
config.put(SCHEMA_DIRECTORY_KEY, value);
41+
} else if (key.startsWith("SCHEMA_REGISTRY_")) {
42+
String newKey = key.substring("SCHEMA_REGISTRY_".length()).replace("_", ".").toLowerCase();
43+
config.put(newKey, value);
4544
}
4645
});
4746

4847
handleDefaultConfig(config);
49-
handleAuthentication(username, password, config);
5048

5149
log.info("Schema Registry Config: {}", config);
5250

5351
builder.putAllConfig(config);
5452
}
5553

5654
private static void handleDefaultConfig(Map<String, Object> config) {
57-
final String DEFAULT_URL = "http://localhost:8081";
5855
final String CURRENT_WORKING_DIR = System.getProperty("user.dir");
59-
if (!config.containsKey(SCHEMA_REGISTRY_URL_KEY)) {
60-
log.info("{} not set. Using default value of {}", SCHEMA_REGISTRY_URL_KEY, DEFAULT_URL);
61-
config.put(SCHEMA_REGISTRY_URL_KEY, DEFAULT_URL);
62-
}
6356
if (!config.containsKey(SCHEMA_DIRECTORY_KEY)) {
64-
log.info("{} not set. Defaulting to current working directory: {}", SCHEMA_DIRECTORY_KEY, CURRENT_WORKING_DIR);
57+
log.info("{} not set. Defaulting to current working directory: {}", SCHEMA_DIRECTORY_KEY,
58+
CURRENT_WORKING_DIR);
6559
config.put(SCHEMA_DIRECTORY_KEY, CURRENT_WORKING_DIR);
6660
}
6761
}
68-
69-
private static void handleAuthentication(AtomicReference<String> username, AtomicReference<String> password, Map<String, Object> config) {
70-
if (username.get() != null && password.get() != null) {
71-
String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule";
72-
String value = String.format("%s required username=\"%s\" password=\"%s\";",
73-
loginModule, escape(username.get()), escape(password.get()));
74-
config.put(SCHEMA_REGISTRY_SASL_CONFIG_KEY, value);
75-
} else {
76-
if(config.get(SCHEMA_REGISTRY_SASL_CONFIG_KEY) == null) {
77-
log.info("{} or {} not set. No authentication configured for the Schema Registry",
78-
SCHEMA_REGISTRY_SASL_JAAS_USERNAME_KEY, SCHEMA_REGISTRY_SASL_JAAS_PASSWORD_KEY);
79-
}
80-
}
81-
}
82-
83-
private static String escape(String value) {
84-
if (value != null) {
85-
return value.replace("\"", "\\\"");
86-
}
87-
return null;
88-
}
8962
}

0 commit comments

Comments
 (0)