package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.security.authorizer.AccessRule;
import io.confluent.security.authorizer.provider.ConfluentBuiltInProviders;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.metadata.authorizer.AclMutator;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/security/authorizer/ConfluentServerAuthorizerKRaftUnitTest.class */
public class ConfluentServerAuthorizerKRaftUnitTest {
    @Test
    public void testZkAndKraftProvidersNotAllowedTogether() {
        Map<String, Object> kraftBrokerConfig = kraftBrokerConfig(0, new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, MultiTenantRequestContextTest.KAFKA_PORT), Arrays.asList(ConfluentBuiltInProviders.AccessRuleProviders.ZK_ACL, ConfluentBuiltInProviders.AccessRuleProviders.KRAFT_ACL));
        ConfluentServerAuthorizer confluentServerAuthorizer = new ConfluentServerAuthorizer();
        Assertions.assertThrows(ConfigException.class, () -> {
            confluentServerAuthorizer.configure(kraftBrokerConfig);
        });
    }

    @Test
    public void testKRaftProviderNotSupportedWithZkConfiguration() {
        Map<String, Object> zkBrokerConfig = zkBrokerConfig(0, new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, MultiTenantRequestContextTest.KAFKA_PORT), Collections.singletonList(ConfluentBuiltInProviders.AccessRuleProviders.KRAFT_ACL));
        ConfluentServerAuthorizer confluentServerAuthorizer = new ConfluentServerAuthorizer();
        Assertions.assertThrows(ConfigException.class, () -> {
            confluentServerAuthorizer.configure(zkBrokerConfig);
        });
    }

    @Test
    public void testCentralizedAclsNotSupported() {
        Map<String, Object> kraftBrokerConfig = kraftBrokerConfig(0, new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, MultiTenantRequestContextTest.KAFKA_PORT), Arrays.asList(ConfluentBuiltInProviders.AccessRuleProviders.CONFLUENT, ConfluentBuiltInProviders.AccessRuleProviders.KRAFT_ACL));
        ConfluentServerAuthorizer confluentServerAuthorizer = new ConfluentServerAuthorizer();
        Assertions.assertThrows(ConfigException.class, () -> {
            confluentServerAuthorizer.configure(kraftBrokerConfig);
        });
    }

    @Test
    public void testBasicAuthorization() throws Exception {
        ConfluentServerAuthorizer createAndStartBrokerAuthorizer = createAndStartBrokerAuthorizer(Uuid.randomUuid().toString(), 0);
        ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL);
        createAndStartBrokerAuthorizer.addAcl(Uuid.randomUuid(), new StandardAcl(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType(), AccessRule.WILDCARD_USER_PRINCIPAL.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        Assertions.assertEquals(Arrays.asList(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED), createAndStartBrokerAuthorizer.authorize(sampleRequestContext(ApiKeys.FETCH, new KafkaPrincipal("User", "ernie")), Arrays.asList(new Action(AclOperation.READ, resourcePattern, 1, true, true), new Action(AclOperation.WRITE, resourcePattern, 1, true, true))));
    }

    @Test
    public void testAclCreationReturnsNotControllerIfNoAclMutator() throws Exception {
        ConfluentServerAuthorizer createAndStartBrokerAuthorizer = createAndStartBrokerAuthorizer(Uuid.randomUuid().toString(), 0);
        RequestContext sampleRequestContext = sampleRequestContext(ApiKeys.CREATE_ACLS, new KafkaPrincipal("User", "ernie"));
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL), new AccessControlEntry(AccessRule.WILDCARD_USER_PRINCIPAL.toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        Assertions.assertThrows(NotControllerException.class, () -> {
            createAndStartBrokerAuthorizer.createAcls(sampleRequestContext, Collections.singletonList(aclBinding));
        });
    }

    @Test
    public void testAclDeletionReturnsNotControllerIfNoAclMutator() throws Exception {
        ConfluentServerAuthorizer createAndStartBrokerAuthorizer = createAndStartBrokerAuthorizer(Uuid.randomUuid().toString(), 0);
        RequestContext sampleRequestContext = sampleRequestContext(ApiKeys.DELETE_ACLS, new KafkaPrincipal("User", "ernie"));
        AclBindingFilter aclBindingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", PatternType.LITERAL), new AccessControlEntryFilter(AccessRule.WILDCARD_USER_PRINCIPAL.toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        Assertions.assertThrows(NotControllerException.class, () -> {
            createAndStartBrokerAuthorizer.deleteAcls(sampleRequestContext, Collections.singletonList(aclBindingFilter));
        });
    }

    @EnumSource(value = Errors.class, names = {"NONE", "CLUSTER_AUTHORIZATION_FAILED"})
    @ParameterizedTest
    public void testAclCreation(Errors errors) throws Exception {
        ConfluentServerAuthorizer createAndStartBrokerAuthorizer = createAndStartBrokerAuthorizer(Uuid.randomUuid().toString(), 0);
        AclMutator aclMutator = (AclMutator) Mockito.mock(AclMutator.class);
        createAndStartBrokerAuthorizer.setAclMutator(aclMutator);
        RequestContext sampleRequestContext = sampleRequestContext(ApiKeys.CREATE_ACLS, new KafkaPrincipal("User", "ernie"));
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL), new AccessControlEntry(AccessRule.WILDCARD_USER_PRINCIPAL.toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(aclMutator.createAcls((ControllerRequestContext) ArgumentMatchers.any(ControllerRequestContext.class), (List) ArgumentMatchers.eq(Collections.singletonList(aclBinding)))).thenReturn(completableFuture);
        List createAcls = createAndStartBrokerAuthorizer.createAcls(sampleRequestContext, Collections.singletonList(aclBinding));
        Assertions.assertEquals(1, createAcls.size());
        CompletableFuture completableFuture2 = ((CompletionStage) createAcls.get(0)).toCompletableFuture();
        Assertions.assertFalse(completableFuture2.isDone());
        if (errors == Errors.NONE) {
            completableFuture.complete(Collections.singletonList(AclCreateResult.SUCCESS));
            Assertions.assertTrue(completableFuture2.isDone());
            Assertions.assertFalse(completableFuture2.isCompletedExceptionally());
            Assertions.assertSame(AclCreateResult.SUCCESS, completableFuture2.get());
            return;
        }
        ApiException exception = errors.exception();
        completableFuture.completeExceptionally(errors.exception());
        Assertions.assertTrue(completableFuture2.isDone());
        Assertions.assertFalse(completableFuture2.isCompletedExceptionally());
        Assertions.assertEquals(Optional.of(exception), ((AclCreateResult) completableFuture2.get()).exception());
    }

    @EnumSource(value = Errors.class, names = {"NONE", "CLUSTER_AUTHORIZATION_FAILED"})
    @ParameterizedTest
    public void testAclDeletion(Errors errors) throws Exception {
        ConfluentServerAuthorizer createAndStartBrokerAuthorizer = createAndStartBrokerAuthorizer(Uuid.randomUuid().toString(), 0);
        AclMutator aclMutator = (AclMutator) Mockito.mock(AclMutator.class);
        createAndStartBrokerAuthorizer.setAclMutator(aclMutator);
        RequestContext sampleRequestContext = sampleRequestContext(ApiKeys.DELETE_ACLS, new KafkaPrincipal("User", "ernie"));
        AclBindingFilter aclBindingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", PatternType.LITERAL), new AccessControlEntryFilter(AccessRule.WILDCARD_USER_PRINCIPAL.toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(aclMutator.deleteAcls((ControllerRequestContext) ArgumentMatchers.any(ControllerRequestContext.class), (List) ArgumentMatchers.eq(Collections.singletonList(aclBindingFilter)))).thenReturn(completableFuture);
        List deleteAcls = createAndStartBrokerAuthorizer.deleteAcls(sampleRequestContext, Collections.singletonList(aclBindingFilter));
        Assertions.assertEquals(1, deleteAcls.size());
        CompletableFuture completableFuture2 = ((CompletionStage) deleteAcls.get(0)).toCompletableFuture();
        Assertions.assertFalse(completableFuture2.isDone());
        if (errors == Errors.NONE) {
            AclDeleteResult aclDeleteResult = new AclDeleteResult(Collections.singletonList(new AclDeleteResult.AclBindingDeleteResult(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL), new AccessControlEntry(AccessRule.WILDCARD_USER_PRINCIPAL.toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)))));
            completableFuture.complete(Collections.singletonList(aclDeleteResult));
            Assertions.assertTrue(completableFuture2.isDone());
            Assertions.assertFalse(completableFuture2.isCompletedExceptionally());
            Assertions.assertEquals(aclDeleteResult, completableFuture2.get());
            return;
        }
        ApiException exception = errors.exception();
        completableFuture.completeExceptionally(errors.exception());
        Assertions.assertTrue(completableFuture2.isDone());
        Assertions.assertFalse(completableFuture2.isCompletedExceptionally());
        Assertions.assertEquals(Optional.of(exception), ((AclDeleteResult) completableFuture2.get()).exception());
    }

    private ConfluentServerAuthorizer createAndStartBrokerAuthorizer(String str, int i) {
        Endpoint endpoint = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, MultiTenantRequestContextTest.KAFKA_PORT);
        Map<String, Object> kraftBrokerConfig = kraftBrokerConfig(i, endpoint, Collections.singletonList(ConfluentBuiltInProviders.AccessRuleProviders.KRAFT_ACL));
        ConfluentServerAuthorizer confluentServerAuthorizer = new ConfluentServerAuthorizer();
        confluentServerAuthorizer.configure(kraftBrokerConfig);
        confluentServerAuthorizer.start(serverInfo(str, i, endpoint));
        return confluentServerAuthorizer;
    }

    private RequestContext sampleRequestContext(ApiKeys apiKeys, KafkaPrincipal kafkaPrincipal) throws Exception {
        return new RequestContext(new RequestHeader(apiKeys, apiKeys.latestVersion(), "ClientId", 0), "localhost:92342", InetAddress.getLocalHost(), kafkaPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, (PathAwareSniHostName) null, true);
    }

    private Map<String, Object> baseBrokerConfig(Endpoint endpoint, List<ConfluentBuiltInProviders.AccessRuleProviders> list) {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.ListenersProp(), "PLAINTEXT://" + endpoint.host() + ":" + endpoint.port());
        hashMap.put(KafkaConfig.AuthorizerClassNameProp(), ConfluentServerAuthorizer.class.getName());
        hashMap.put("confluent.multitenant.listener.names", "EXTERNAL");
        hashMap.put("confluent.authorizer.access.rule.providers", Utils.join(list.stream().map(accessRuleProviders -> {
            return accessRuleProviders.name();
        }).toArray(), ","));
        return hashMap;
    }

    private Map<String, Object> zkBrokerConfig(int i, Endpoint endpoint, List<ConfluentBuiltInProviders.AccessRuleProviders> list) {
        Map<String, Object> baseBrokerConfig = baseBrokerConfig(endpoint, list);
        baseBrokerConfig.put(KafkaConfig.ZkConnectProp(), "localhost:2181");
        baseBrokerConfig.put(KafkaConfig.BrokerIdProp(), i + "");
        return baseBrokerConfig;
    }

    private Map<String, Object> kraftBrokerConfig(int i, Endpoint endpoint, List<ConfluentBuiltInProviders.AccessRuleProviders> list) {
        Map<String, Object> baseBrokerConfig = baseBrokerConfig(endpoint, list);
        baseBrokerConfig.put(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER");
        baseBrokerConfig.put("confluent.cluster.link.enable", "false");
        baseBrokerConfig.put(KafkaConfig.ProcessRolesProp(), "broker");
        baseBrokerConfig.put(KafkaConfig.QuorumVotersProp(), "10@localhost:8092");
        baseBrokerConfig.put(KafkaConfig.NodeIdProp(), i + "");
        return baseBrokerConfig;
    }

    private ConfluentAuthorizerServerInfo serverInfo(final String str, final int i, final Endpoint endpoint) {
        return new ConfluentAuthorizerServerInfo() { // from class: io.confluent.kafka.security.authorizer.ConfluentServerAuthorizerKRaftUnitTest.1
            public ClusterResource clusterResource() {
                return new ClusterResource(str);
            }

            public int brokerId() {
                return i;
            }

            public Collection<Endpoint> endpoints() {
                return Collections.singleton(endpoint);
            }

            public Endpoint interBrokerEndpoint() {
                return endpoint;
            }

            public Metrics metrics() {
                return new Metrics();
            }
        };
    }
}
