Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 55 additions & 137 deletions src/main/java/com/trifork/cheetah/CheetahKRaftAuthorizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.metadata.authorizer.AclMutator;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
Expand All @@ -21,8 +23,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import io.strimzi.kafka.oauth.common.BearerTokenWithPayload;
Expand All @@ -35,11 +35,8 @@
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.kafka.common.acl.AclOperation.*;
import scala.collection.JavaConverters;



public class CheetahKRaftAuthorizer implements ClusterMetadataAuthorizer {
static final Logger LOG = LoggerFactory.getLogger(CheetahKRaftAuthorizer.class.getName());
private String topicClaimName;
Expand All @@ -53,7 +50,8 @@ public class CheetahKRaftAuthorizer implements ClusterMetadataAuthorizer {
private static final AtomicInteger INSTANCE_NUMBER_COUNTER = new AtomicInteger(1);

/**
* An instance number used in {@link #toString()} method, to easily track the number of instances of this class
* An instance number used in {@link #toString()} method, to easily track the
* number of instances of this class
*/
private final int instanceNumber = INSTANCE_NUMBER_COUNTER.getAndIncrement();

Expand Down Expand Up @@ -111,7 +109,9 @@ private StandardAuthorizer instantiateStandardAuthorizer() {
LOG.debug("Using StandardAuthorizer (KRaft based) as a delegate");
return new StandardAuthorizer();
} catch (Exception e) {
throw new ConfigException("KRaft mode detected ('process.roles' configured), but failed to instantiate org.apache.kafka.metadata.authorizer.StandardAuthorizer", e);
throw new ConfigException(
"KRaft mode detected ('process.roles' configured), but failed to instantiate org.apache.kafka.metadata.authorizer.StandardAuthorizer",
e);
}
}

Expand Down Expand Up @@ -168,17 +168,18 @@ public Iterable<AclBinding> acls(AclBindingFilter filter) {
}

@Override
public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext,
List<AclBinding> aclBindings) {
if (delegate != null) {
return delegate.createAcls(requestContext, aclBindings);
} else {
throw new UnsupportedOperationException("ACL delegation not enabled");
}
}


@Override
public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext,
List<AclBindingFilter> aclBindingFilters) {
if (delegate != null) {
return delegate.deleteAcls(requestContext, aclBindingFilters);
} else {
Expand All @@ -196,7 +197,8 @@ public int aclCount() {
}

@Override
public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op,
ResourceType resourceType) {
if (delegate != null) {
return delegate.authorizeByResourceType(requestContext, op, resourceType);
} else {
Expand All @@ -206,7 +208,6 @@ public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext re

@Override
public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
List<AuthorizationResult> results = new ArrayList<>(actions.size());
if (!(requestContext.principal() instanceof OAuthKafkaPrincipal)) {
return handleSuperUsers(requestContext, actions);
}
Expand All @@ -232,20 +233,45 @@ public List<AuthorizationResult> authorize(AuthorizableRequestContext requestCon
List<TopicAccess> topicAccesses = extractTopicAccesses(topicAccessesRaw, prefix);
List<ClusterAccess> clusterAccesses = extractClusterAccesses(clusterAccessesRaw, prefix);

for (Action action : actions) {
if (checkTopicJwtClaims(topicAccesses, action) || checkClusterJwtClaims(clusterAccesses, action)) {
results.add(AuthorizationResult.ALLOWED);
continue;
}
for (TopicAccess topic : topicAccesses) {
ResourceType resourceType = ResourceType.TOPIC;
String resourceName = topic.pattern;
PatternType patternType = PatternType.MATCH; // todo
// String principal = requestContext.principal().getName();
String host = "*";
AclOperation operation = topic.operation;
AclPermissionType permissionType = AclPermissionType.ALLOW;

delegate.addAcl(Uuid.randomUuid(), new StandardAcl(
resourceType,
resourceName,
patternType,
principal.getName(),
host,
operation,
permissionType));
}

if (LOG.isDebugEnabled()) {
LOG.debug("Action was Denied");
LOG.debug(action.toString());
}
results.add(AuthorizationResult.DENIED);
for (ClusterAccess cluster : clusterAccesses) {
ResourceType resourceType = ResourceType.CLUSTER;
String resourceName = "cluster";
PatternType patternType = PatternType.MATCH;
// String principal = requestContext.principal().getName();
String host = "*";
AclOperation operation = cluster.operation;
AclPermissionType permissionType = AclPermissionType.ALLOW;

delegate.addAcl(Uuid.randomUuid(), new StandardAcl(
resourceType,
resourceName,
patternType,
principal.getName(),
host,
operation,
permissionType));
}

return results;
return delegate.authorize(requestContext, actions);
}

@Override
Expand All @@ -262,7 +288,8 @@ public String toString() {

private List<AuthorizationResult> handleSuperUsers(AuthorizableRequestContext requestContext,
List<Action> actions) {
UserSpec user = new UserSpec(requestContext.principal().getPrincipalType(), requestContext.principal().getName());
UserSpec user = new UserSpec(requestContext.principal().getPrincipalType(),
requestContext.principal().getName());
if (superUsers.contains(user)) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Granting access to superuser %s", user.getName()));
Expand Down Expand Up @@ -321,7 +348,8 @@ public static List<ClusterAccess> extractClusterAccesses(List<String> accesses,
}

// Allow only valid operations for cluster
Set<AclOperation> validOps = new HashSet<>(JavaConverters.asJava(AclEntry.supportedOperations(ResourceType.CLUSTER)));
Set<AclOperation> validOps = new HashSet<>(
JavaConverters.asJava(AclEntry.supportedOperations(ResourceType.CLUSTER)));
validOps.add(AclOperation.ALL);
validOps.add(AclOperation.ANY);
ArrayList<ClusterAccess> validAccesses = new ArrayList<>();
Expand Down Expand Up @@ -373,7 +401,8 @@ public static List<TopicAccess> extractTopicAccesses(List<String> accesses, Stri
}

// Allow only valid operations for topics
Set<AclOperation> validOps = new HashSet<>(JavaConverters.asJava(AclEntry.supportedOperations(ResourceType.TOPIC)));
Set<AclOperation> validOps = new HashSet<>(
JavaConverters.asJava(AclEntry.supportedOperations(ResourceType.TOPIC)));
validOps.add(AclOperation.ALL);
validOps.add(AclOperation.ANY);
ArrayList<TopicAccess> validAccesses = new ArrayList<>();
Expand All @@ -389,115 +418,4 @@ public static List<TopicAccess> extractTopicAccesses(List<String> accesses, Stri

return validAccesses;
}

public static boolean checkTopicJwtClaims(List<TopicAccess> topicAccesses, Action requestedAction) {
for (TopicAccess t : topicAccesses) {
switch (requestedAction.resourcePattern().resourceType()) {
case TOPIC:
if (matchTopicPattern(requestedAction, t) && checkTopicAccess(t.operation, requestedAction))
return true;
break;
case CLUSTER: // check for some default cluster actions based on topic claim
if (checkClusterAccess(t.operation, requestedAction))
return true;
break;
case GROUP: // check for some default (consumer)group actions based on topic claim
if (checkGroupAccess(t.operation, requestedAction))
return true;
break;
default:
break;
}
}
return false;
}

public static boolean checkClusterJwtClaims(List<ClusterAccess> clusterAccesses, Action requestedAction) {
for (ClusterAccess c : clusterAccesses) {
switch (requestedAction.resourcePattern().resourceType()) {
case CLUSTER:
return claimSupportsRequestedAction(c.operation, requestedAction);
default:
break;
}
}
return false;
}

private static boolean checkGroupAccess(AclOperation claimedOperation, Action requestedAction) {
switch (requestedAction.operation()) {
case READ:
return List.of(ANY, ALL, READ).contains(claimedOperation);
case DESCRIBE:
return List.of(ANY, ALL, READ, DESCRIBE).contains(claimedOperation);
case DELETE:
return List.of(ANY, ALL, DELETE).contains(claimedOperation);
default:
return false;
}
}

private static boolean checkClusterAccess(AclOperation claimedOperation, Action requestedAction) {
switch (requestedAction.operation()) {
case CREATE:
return List.of(ANY, ALL, CREATE).contains(claimedOperation);
case CLUSTER_ACTION:
return List.of(ANY, ALL, CLUSTER_ACTION).contains(claimedOperation);
case DESCRIBE_CONFIGS:
return List.of(ANY, ALL, ALTER_CONFIGS, DESCRIBE_CONFIGS).contains(claimedOperation);
case ALTER_CONFIGS:
return List.of(ANY, ALL, ALTER_CONFIGS).contains(claimedOperation);
case IDEMPOTENT_WRITE:
return List.of(ANY, ALL, WRITE).contains(claimedOperation);
case ALTER:
return List.of(ANY, ALL, ALTER).contains(claimedOperation);
case DESCRIBE: // Should this add CREATE?
return List.of(ANY, ALL, ALTER, DESCRIBE).contains(claimedOperation);
default:
return false;
}
}

private static boolean checkTopicAccess(AclOperation claimedOperation, Action requestedAction) {
switch (requestedAction.operation()) {
case READ:
return List.of(ANY, ALL, READ).contains(claimedOperation);
case WRITE:
return List.of(ANY, ALL, WRITE).contains(claimedOperation);
case CREATE:
return List.of(ANY, ALL, CREATE).contains(claimedOperation);
case DELETE:
return List.of(ANY, ALL, DELETE).contains(claimedOperation);
case ALTER:
return List.of(ANY, ALL, ALTER).contains(claimedOperation);
case DESCRIBE:
// READ, WRITE, DELETE, and ALTER implicitly allow DESCRIBE
return List.of(ANY, ALL, READ, WRITE, DELETE, ALTER, DESCRIBE).contains(claimedOperation);
case DESCRIBE_CONFIGS:
// ALTER_CONFIGS implicitly allows DESCRIBE_CONFIGS
return List.of(ANY, ALL, ALTER_CONFIGS, DESCRIBE_CONFIGS).contains(claimedOperation);
case ALTER_CONFIGS:
return List.of(ANY, ALL, ALTER_CONFIGS).contains(claimedOperation);
default:
return claimSupportsRequestedAction(claimedOperation, requestedAction);

}
}

private static boolean claimSupportsRequestedAction(AclOperation claimedOperation, Action requestedAction) {
return List.of(ANY, ALL).contains(claimedOperation)
|| requestedAction.operation().equals(claimedOperation);
}

private static boolean matchTopicPattern(Action action, TopicAccess t) {
if (t.pattern.endsWith("*")) {
return action.resourcePattern().name().startsWith(t.pattern.replace("*", ""));
} else if (t.pattern.startsWith("*")) {
// This is not part of the default patterns
// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java#L70
return action.resourcePattern().name().endsWith((t.pattern.replace("*", "")));
} else {
return action.resourcePattern().name().equals(t.pattern);
}
}
}
10 changes: 3 additions & 7 deletions src/main/java/com/trifork/cheetah/CheetahKafkaAuthorizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private CheetahConfig convertToCheetahConfig(Map<String, ?> configs) {

@Override
public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {

List<AuthorizationResult> results = new ArrayList<>(actions.size());
if (!(requestContext.principal() instanceof OAuthKafkaPrincipal)) {
return handleSuperUsers(requestContext, actions);
Expand Down Expand Up @@ -90,14 +91,12 @@ public List<AuthorizationResult> authorize(AuthorizableRequestContext requestCon

for (Action action : actions) {
if (checkTopicJwtClaims(topicAccesses, action) || checkClusterJwtClaims(clusterAccesses, action)) {
super.logAuditMessage(requestContext, action, true);
results.add(AuthorizationResult.ALLOWED);
continue;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Action was Denied");
LOG.debug(action.toString());
}
super.logAuditMessage(requestContext, action, false);
results.add(AuthorizationResult.DENIED);
}

Expand All @@ -123,9 +122,6 @@ private List<String> extractAccessClaim(OAuthKafkaPrincipal principal) {
private List<AuthorizationResult> handleSuperUsers(AuthorizableRequestContext requestContext,
List<Action> actions) {
if (super.isSuperUser(requestContext.principal())) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Superuser: %s", requestContext.principal().getName()));
}
return Collections.nCopies(actions.size(), AuthorizationResult.ALLOWED);
} else {
return Collections.nCopies(actions.size(), AuthorizationResult.DENIED);
Expand Down
Loading