/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.authorizer.AccessRule;
import io.confluent.security.authorizer.AclAccessRule;
import io.confluent.security.authorizer.AuthorizePolicy;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.AccessRuleProvider;
import io.confluent.security.authorizer.provider.AuthorizeRule;
import io.confluent.security.authorizer.provider.ResourceAuthorizeRules;
import io.confluent.security.roledefinitions.Operation;
import io.confluent.security.roledefinitions.PermissionType;
import io.confluent.security.roledefinitions.ResourceType;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class MockConfluentServerAuthorizerTest {
    private String brokerUUID;
    private ConfluentServerAuthorizer authorizer;
    private ConfluentAuthorizerServerInfo serverInfo;
    private final Endpoint interBrokerEndpoint = new Endpoint("replication", SecurityProtocol.PLAINTEXT, "localhost", 9091);
    private final Endpoint externalEndpoint = new Endpoint("external", SecurityProtocol.SASL_SSL, "localhost", 9092);
    private final Endpoint internalEndpoint = new Endpoint("internal", SecurityProtocol.PLAINTEXT, "localhost", 9093);
    private Collection<Endpoint> endpoints;
    private ExecutorService executorService;
    private volatile Map<Endpoint, ? extends CompletionStage<Void>> startFutures;
    private final PathAwareSniHostName sniHostName = new PathAwareSniHostName("pb-lkc-1234-00aa-usw2-az1-x092.us-west-2.aws.glb.confluent.cloud");

    @BeforeEach
    public void setUp() throws Exception {
        this.endpoints = Arrays.asList(this.interBrokerEndpoint, this.externalEndpoint, this.internalEndpoint);
        this.setUp(Collections.emptyMap(), this.endpoints);
    }

    private void setUp(Map<String, Object> additionalConfigs, final Collection<Endpoint> endpoints) {
        MockAclProvider.reset();
        MockAuditLogProvider.reset();
        this.authorizer = new ConfluentServerAuthorizer();
        this.executorService = Executors.newSingleThreadExecutor();
        this.brokerUUID = "uuid";
        final HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("controller.listener.names", "CONTROLLER");
        configs.put("process.roles", "broker");
        configs.put("controller.quorum.voters", "10@localhost:8092");
        configs.put("node.id", "0");
        configs.put("listeners", "PLAINTEXT://localhost:9092");
        configs.put("listeners", "INTERNAL://127.0.0.1:9092,EXTERNAL://127.0.0.1:9093");
        configs.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT");
        configs.put("inter.broker.listener.name", "INTERNAL");
        configs.put(KafkaConfig$.MODULE$.BrokerSessionUuidProp(), this.brokerUUID);
        configs.put("confluent.authorizer.access.rule.providers", "MOCK_ACL");
        configs.putAll(additionalConfigs);
        this.authorizer.configure(configs);
        this.serverInfo = new ConfluentAuthorizerServerInfo(){

            public ClusterResource clusterResource() {
                return new ClusterResource("clusterA");
            }

            public int brokerId() {
                return 1;
            }

            public Collection<Endpoint> endpoints() {
                return endpoints;
            }

            public Endpoint interBrokerEndpoint() {
                return MockConfluentServerAuthorizerTest.this.interBrokerEndpoint;
            }

            public Collection<String> earlyStartListeners() {
                List<String> earlyStartListeners = ConfluentConfigs.listenerNames((String)"early.start.listeners", (Map)configs, null).stream().map(listenerName -> Optional.of(listenerName).map(ListenerName::normalised).map(ListenerName::value).orElse("")).collect(Collectors.toList());
                List controllerListeners = ConfluentConfigs.listenerNames((String)"controller.listener.names", (Map)configs, null).stream().map(listenerName -> Optional.of(listenerName).map(ListenerName::normalised).map(ListenerName::value).orElse("")).collect(Collectors.toList());
                earlyStartListeners.addAll(controllerListeners);
                return earlyStartListeners;
            }

            public AuditLogProvider auditLogProvider() {
                MockAuditLogProvider logProvider = new MockAuditLogProvider();
                logProvider.configure(configs);
                return logProvider;
            }

            public Metrics metrics() {
                return new Metrics();
            }

            public Map<String, ?> interBrokerClientConfig() {
                return Collections.singletonMap("bootstrap.servers", "localhost:" + MockConfluentServerAuthorizerTest.this.interBrokerEndpoint.port());
            }
        };
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.executorService.shutdownNow();
        this.authorizer.close();
        MockAclProvider.reset();
        MockAuditLogProvider.reset();
        KafkaTestUtils.verifyThreadCleanup();
    }

    @Test
    public void testStartupSequenceInMdsCluster() throws Exception {
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        this.startFutures.get(this.internalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInCloudCluster() throws Exception {
        this.tearDown();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.multitenant.listener.names", "external");
        this.setUp(configs, this.endpoints);
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue((boolean)this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInKraftClusterWithoutEarlyStartListenerNamesCP() throws Exception {
        List<Endpoint> kraftEndpoints = Arrays.asList(this.interBrokerEndpoint, this.internalEndpoint, this.externalEndpoint);
        this.tearDown();
        this.setUp(new HashMap<String, Object>(), kraftEndpoints);
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.internalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInKraftClusterWithEarlyStartListenerNamesCP() throws Exception {
        List<Endpoint> kraftEndpoints = Arrays.asList(this.interBrokerEndpoint, this.internalEndpoint, this.externalEndpoint);
        this.tearDown();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("early.start.listeners", "EXTERNAL");
        this.setUp(configs, kraftEndpoints);
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.internalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInKraftClusterWithoutEarlyStartListenerNamesCloud() throws Exception {
        List<Endpoint> kraftEndpoints = Arrays.asList(this.interBrokerEndpoint, this.internalEndpoint, this.externalEndpoint);
        this.tearDown();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.multitenant.listener.names", "EXTERNAL");
        this.setUp(configs, kraftEndpoints);
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue((boolean)this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInKraftClusterWithEarlyStartListenerNamesCloud() throws Exception {
        Endpoint internalEndpoint2 = new Endpoint("internal2", SecurityProtocol.SASL_SSL, "localhost", 9094);
        List<Endpoint> kraftEndpoints = Arrays.asList(this.interBrokerEndpoint, this.internalEndpoint, this.externalEndpoint, internalEndpoint2);
        this.tearDown();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("early.start.listeners", "INTERNAL2");
        configs.put("listeners", "INTERNAL://127.0.0.1:9093,EXTERNAL://127.0.0.1:9092,INTERNAL2://127.0.0.1:9094");
        configs.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,INTERNAL2:PLAINTEXT");
        configs.put("confluent.multitenant.listener.names", "EXTERNAL");
        this.setUp(configs, kraftEndpoints);
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue((boolean)this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue((boolean)this.startFutures.get(internalEndpoint2).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInNonMdsCluster() throws Exception {
        MockAclProvider.usesMetadataFromThisKafkaCluster = false;
        this.startAuthorizer();
        Assertions.assertNull(this.startFutures);
        MockAclProvider.startFuture.complete(null);
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assertions.assertTrue((boolean)this.startFutures.values().stream().allMatch(future -> future.toCompletableFuture().isDone()));
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testAuditLogEntries() throws Exception {
        MockAclProvider.startFuture.complete(null);
        this.authorizer.start((AuthorizerServerInfo)this.serverInfo).values().forEach(future -> future.toCompletableFuture().join());
        this.authorizer.completeInitialLoad();
        RequestContext requestContext = new RequestContext(null, "", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, this.sniHostName, false);
        Action allowedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedWithLog", PatternType.LITERAL), 1, true, false);
        Action allowedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedNoLog", PatternType.LITERAL), 1, false, true);
        Action deniedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedWithLog", PatternType.LITERAL), 1, false, true);
        Action deniedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedNoLog", PatternType.LITERAL), 1, true, false);
        Assertions.assertEquals((Object)AuthorizationResult.ALLOWED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(allowedWithLog)).get(0));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        Assertions.assertEquals((int)1, (int)auditLogProvider.authorizationLog.size());
        Assertions.assertEquals((Object)"allowedWithLog", (Object)auditLogProvider.lastAuthorizationEntry().action().resourcePattern().name());
        Assertions.assertEquals((Object)AuthorizeResult.ALLOWED, (Object)auditLogProvider.lastAuthorizationEntry().authorizeResult());
        Assertions.assertEquals((Object)KafkaPrincipal.ANONYMOUS, (Object)auditLogProvider.lastAuthorizationEntry().requestContext().principal());
        Assertions.assertEquals((Object)AuthorizePolicy.PolicyType.ALLOW_ACL, (Object)auditLogProvider.lastAuthorizationEntry().authorizePolicy().policyType());
        auditLogProvider.authorizationLog.clear();
        Assertions.assertEquals((Object)AuthorizationResult.ALLOWED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(allowedNoLog)).get(0));
        Assertions.assertTrue((boolean)auditLogProvider.authorizationLog.isEmpty());
        Assertions.assertEquals((Object)AuthorizationResult.DENIED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(deniedWithLog)).get(0));
        Assertions.assertEquals((int)1, (int)auditLogProvider.authorizationLog.size());
        Assertions.assertEquals((Object)"deniedWithLog", (Object)auditLogProvider.lastAuthorizationEntry().action().resourcePattern().name());
        Assertions.assertEquals((Object)AuthorizeResult.DENIED, (Object)auditLogProvider.lastAuthorizationEntry().authorizeResult());
        Assertions.assertEquals((Object)AuthorizePolicy.PolicyType.DENY_ON_NO_RULE, (Object)auditLogProvider.lastAuthorizationEntry().authorizePolicy().policyType());
        auditLogProvider.authorizationLog.clear();
        Assertions.assertEquals((Object)AuthorizationResult.DENIED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(deniedNoLog)).get(0));
    }

    @Test
    public void testAuditLogException() throws Exception {
        MockAclProvider.startFuture.complete(null);
        this.authorizer.start((AuthorizerServerInfo)this.serverInfo).values().forEach(future -> future.toCompletableFuture().join());
        this.authorizer.completeInitialLoad();
        MockAuditLogProvider.getInstance(this.brokerUUID).setFail(true);
        RequestContext requestContext = new RequestContext(null, "", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, this.sniHostName, false);
        Action allowedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedWithLog", PatternType.LITERAL), 1, true, false);
        Action allowedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedNoLog", PatternType.LITERAL), 1, false, true);
        Action deniedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedWithLog", PatternType.LITERAL), 1, false, true);
        Action deniedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedNoLog", PatternType.LITERAL), 1, true, false);
        Assertions.assertEquals((Object)AuthorizationResult.ALLOWED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(allowedWithLog)).get(0));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        Assertions.assertTrue((boolean)auditLogProvider.authorizationLog.isEmpty());
    }

    private void startAuthorizer() {
        this.executorService.submit(() -> {
            this.startFutures = this.authorizer.start((AuthorizerServerInfo)this.serverInfo);
            this.authorizer.completeInitialLoad();
        });
    }

    public static final class MockAclProvider
    implements AccessRuleProvider {
        static boolean usesMetadataFromThisKafkaCluster;
        static CompletableFuture<Void> startFuture;

        public String providerName() {
            return "MOCK_ACL";
        }

        public void configure(Map<String, ?> configs) {
        }

        public CompletionStage<Void> start(ConfluentAuthorizerServerInfo serverInfo) {
            Assertions.assertTrue((!serverInfo.interBrokerClientConfig().containsKey("broker.id") ? 1 : 0) != 0);
            return startFuture;
        }

        public boolean usesMetadataFromThisKafkaCluster() {
            return usesMetadataFromThisKafkaCluster;
        }

        public boolean isSuperUser(KafkaPrincipal principal, Scope scope) {
            return false;
        }

        public AuthorizeRule findRule(KafkaPrincipal principal, Set<KafkaPrincipal> groupPrincipals, String host, io.confluent.security.authorizer.Action action) {
            io.confluent.security.authorizer.ResourcePattern resource = action.resourcePattern();
            AuthorizeRule authorizeRule = new AuthorizeRule();
            if (resource.name().startsWith("allowed")) {
                AclAccessRule rule = new AclAccessRule(resource, principal, PermissionType.ALLOW, "*", Operation.ALL, AuthorizePolicy.PolicyType.ALLOW_ACL, new AclBinding(io.confluent.security.authorizer.ResourcePattern.to((io.confluent.security.authorizer.ResourcePattern)resource), new AccessControlEntry(principal.getName(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
                authorizeRule.addRuleIfNotExist((AccessRule)rule);
            }
            return authorizeRule;
        }

        public void addMatchingRules(ResourceAuthorizeRules matchingRules, KafkaPrincipal sessionPrincipal, Set<KafkaPrincipal> groupPrincipals, String host, Operation operation, Scope resourceScope, ResourceType resourceType) {
        }

        public boolean mayDeny() {
            return false;
        }

        public void close() {
        }

        static void reset() {
            usesMetadataFromThisKafkaCluster = true;
            startFuture = new CompletableFuture();
        }
    }
}

