Skip to content

Commit

Permalink
Prevent creating topics just after deletion
Browse files Browse the repository at this point in the history
see  #1739
Deletion creates a node in the group /deletion_time, stores the time there
Throws an exception if not enough time has passed (5 min)
all existing unit tests pass (except the docker)

pending:
- updating with latest upstream
- manual testing in UI and running docker test
- adding unit tests for the added functionality
- checking integration tests
- configuration of the time delay
  • Loading branch information
danigiri committed Oct 29, 2024
1 parent 4180225 commit d19411b
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public enum ErrorCode {
TIMEOUT(REQUEST_TIMEOUT),
TOPIC_ALREADY_EXISTS(BAD_REQUEST),
TOPIC_CREATED_RECENTLY(BAD_REQUEST),
TOPIC_NOT_EXISTS(NOT_FOUND),
GROUP_NOT_EXISTS(NOT_FOUND),
GROUP_NAME_IS_INVALID(BAD_REQUEST),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public interface TopicRepository {

void createTopic(Topic topic);

void ensureTopicCanBeCreated(TopicName topicName);

void removeTopic(TopicName topicName);

void updateTopic(Topic topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -177,14 +178,30 @@ protected void deleteInTransaction(List<String> paths) throws Exception {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction =
zookeeper.inTransaction().delete().forPath(paths.get(0)).and();
addPathsToDelete(zookeeper.inTransaction(), paths).commit();
}

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}
protected void deleteAndCreateInTransaction(List<String> paths, String path, Object value)
throws Exception {
ensureConnected();
CuratorTransactionFinal transaction = addPathsToDelete(zookeeper.inTransaction(), paths);
transaction.create().forPath(path, mapper.writeValueAsBytes(value)).and().commit();
}

transaction.commit();
protected void deleteAndCreateInTransaction(
List<String> pathsToDelete, List<String> paths, String path, Object value) throws Exception {
ensureConnected();
CuratorTransactionFinal transaction =
addPathsToDelete(zookeeper.inTransaction(), pathsToDelete);
transaction = addPathsToCreate(transaction, paths);
transaction.create().forPath(path, mapper.writeValueAsBytes(value)).and().commit();
}

protected void deleteAndOverwriteInTransaction(List<String> paths, String path, Object value)
throws Exception {
ensureConnected();
CuratorTransactionFinal transaction = addPathsToDelete(zookeeper.inTransaction(), paths);
transaction.setData().forPath(path, mapper.writeValueAsBytes(value)).and().commit();
}

protected void create(String path, Object value) throws Exception {
Expand All @@ -211,4 +228,24 @@ protected void remove(String path) throws Exception {
private interface ThrowingReader<T> {
T read(byte[] data) throws IOException;
}

private CuratorTransactionFinal addPathsToDelete(
CuratorTransaction transaction, List<String> paths) throws Exception {
CuratorTransactionFinal transactionWithDelete =
transaction.delete().forPath(paths.get(0)).and();
for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}
return transactionWithDelete;
}

private CuratorTransactionFinal addPathsToCreate(
CuratorTransaction transaction, List<String> paths) throws Exception {
CuratorTransactionFinal transactionWithCreate =
transaction.create().forPath(paths.get(0)).and();
for (int i = 1; i < paths.size(); i++) {
transaction = transaction.create().forPath(paths.get(i)).and();
}
return transactionWithCreate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ public void removeGroup(String groupName) {
ensureGroupIsEmpty(groupName);

logger.info("Removing group: {}", groupName);
List<String> pathsToDelete = List.of(paths.topicsPath(groupName), paths.groupPath(groupName));
List<String> pathsToDelete;
String topicDeletionTimePath = paths.groupTopicDeletionTimePath(groupName);
if (!pathExists(topicDeletionTimePath)) {
pathsToDelete = List.of(paths.topicsPath(groupName), paths.groupPath(groupName));
} else {
// need to add all deletion time paths here
pathsToDelete =
List.of(paths.topicsPath(groupName), topicDeletionTimePath, paths.groupPath(groupName));
}
try {
deleteInTransaction(pathsToDelete);
} catch (Exception e) {
Expand All @@ -84,7 +92,11 @@ public void removeGroup(String groupName) {
}

private void ensureGroupIsEmpty(String groupName) {
if (!childrenOf(paths.topicsPath(groupName)).isEmpty()) {
String topicDeletionTimePath = paths.groupTopicDeletionTimePath(groupName);
if (!childrenOf(paths.topicsPath(groupName)).stream()
.filter(path -> !path.equals(topicDeletionTimePath))
.collect(Collectors.toList())
.isEmpty()) {
throw new GroupNotEmptyException(groupName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ZookeeperPaths {
public static final String GROUPS_PATH = "groups";
public static final String SUBSCRIPTIONS_PATH = "subscriptions";
public static final String KAFKA_TOPICS_PATH = "kafka_topics";
public static final String DELETION_TIME_PATH = "deletion_time";
public static final String URL_SEPARATOR = "/";
public static final String CONSUMERS_WORKLOAD_PATH = "consumers-workload";
public static final String CONSUMER_LOAD_PATH = "consumer-load";
Expand Down Expand Up @@ -61,10 +62,19 @@ public String groupPath(String groupName) {
return Joiner.on(URL_SEPARATOR).join(groupsPath(), groupName);
}

public String groupTopicDeletionTimePath(String groupName) {
return Joiner.on(URL_SEPARATOR).join(groupPath(groupName), DELETION_TIME_PATH);
}

public String topicsPath(String groupName) {
return Joiner.on(URL_SEPARATOR).join(groupPath(groupName), TOPICS_PATH);
}

public String topicDeletionTimePath(TopicName topicName) {
return Joiner.on(URL_SEPARATOR)
.join(groupPath(topicName.getGroupName()), DELETION_TIME_PATH, topicName.getName());
}

public String topicMetricPath(TopicName topicName, String metricName) {
return topicPath(topicName, "metrics", metricName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -15,6 +18,7 @@
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.group.GroupRepository;
import pl.allegro.tech.hermes.domain.topic.TopicAlreadyExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicDeletedRecentlyException;
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

Expand Down Expand Up @@ -64,6 +68,7 @@ public List<Topic> listTopics(String groupName) {
@Override
public void createTopic(Topic topic) {
groupRepository.ensureGroupExists(topic.getName().getGroupName());
ensureTopicCanBeCreated(topic.getName());

String topicPath = paths.topicPath(topic.getName());
logger.info("Creating topic for path {}", topicPath);
Expand Down Expand Up @@ -144,8 +149,23 @@ public void removeTopic(TopicName topicName) {
pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

String topicDeletionTimePath = paths.topicDeletionTimePath(topicName);

try {
deleteInTransaction(pathsForRemoval);
if (pathExists(topicDeletionTimePath)) {
deleteAndOverwriteInTransaction(pathsForRemoval, topicDeletionTimePath, Instant.now());
} else {
String groupDeletionTimePath = paths.groupTopicDeletionTimePath(topicName.getGroupName());
if (!pathExists(groupDeletionTimePath)) {
deleteAndCreateInTransaction(
pathsForRemoval,
List.of(groupDeletionTimePath),
topicDeletionTimePath,
Instant.now());
} else {
deleteAndCreateInTransaction(pathsForRemoval, topicDeletionTimePath, Instant.now());
}
}
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down Expand Up @@ -192,6 +212,19 @@ private Optional<Topic> getTopicDetails(TopicName topicName, boolean quiet) {
quiet);
}

@Override
public void ensureTopicCanBeCreated(TopicName topicName) {
String topicDeletionTimePath = paths.topicDeletionTimePath(topicName);
if (pathExists(topicDeletionTimePath)) {
Instant deletionTime = readFrom(topicDeletionTimePath, Instant.class);
// TODO: make threshold configurable
Instant thresholdTime = deletionTime.plus(5, ChronoUnit.MINUTES);
if (Duration.between(thresholdTime, Instant.now()).toSeconds() > 0) {
throw new TopicDeletedRecentlyException(topicName, thresholdTime);
}
}
}

@Override
public List<Topic> getTopicsDetails(Collection<TopicName> topicNames) {
return topicNames.stream()
Expand Down

0 comments on commit d19411b

Please sign in to comment.