/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.authorizer.AclMigrationAware;
import io.confluent.security.authorizer.RequestContext;
import io.confluent.security.authorizer.utils.AuthorizerUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import kafka.server.KafkaConfig$;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AclMigrationTest {
    private EmbeddedZookeeper zookeeper;
    private String zkConnect;
    private static final KafkaPrincipal USER_PRINCIPAL = KafkaPrincipal.ANONYMOUS;

    @Before
    public void setUp() {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = "localhost:" + this.zookeeper.port();
    }

    @After
    public void tearDown() {
        this.zookeeper.shutdown();
        KafkaTestUtils.verifyThreadCleanup();
    }

    @Test
    public void testAclMigrationLogic() throws Exception {
        TestAuthorizer authorizer1 = new TestAuthorizer();
        HashMap<String, String> authorizerConfigs = new HashMap<String, String>();
        authorizerConfigs.put(KafkaConfig$.MODULE$.ZkConnectProp(), this.zkConnect);
        authorizerConfigs.put("confluent.authorizer.migrate.acls.from.zk", "true");
        authorizer1.configure(authorizerConfigs);
        ConfluentAuthorizerServerInfo serverInfo = KafkaTestUtils.serverInfo("clusterA", SecurityProtocol.SSL);
        Assert.assertThrows(IllegalArgumentException.class, () -> AclMigrationTest.lambda$testAclMigrationLogic$0((Authorizer)authorizer1, serverInfo));
        authorizer1.close();
        TestMigrationAuthorizer authorizer2 = new TestMigrationAuthorizer();
        authorizerConfigs.put("confluent.metadata.bootstrap.server.urls", "http://locahost:8090");
        authorizer2.configure(authorizerConfigs);
        authorizer2.configureServerInfo(serverInfo);
        authorizer2.close();
        TestMigrationAuthorizer authorizer3 = new TestMigrationAuthorizer();
        authorizer3.configure(authorizerConfigs);
        authorizer3.configureServerInfo(serverInfo);
        List<AclBinding> sourceBindings = authorizer3.aclAuthorizer.srcAclCache;
        Assert.assertFalse((boolean)authorizer3.aclAuthorizer.srcAclCache.isEmpty());
        Assert.assertTrue((boolean)authorizer3.secondAuthorizer.destAclCache.isEmpty());
        authorizer3.start((AuthorizerServerInfo)serverInfo);
        Assert.assertEquals((long)sourceBindings.size(), (long)authorizer3.secondAuthorizer.destAclCache.size());
        AclBinding newBinding1 = AclMigrationTest.aclBinding("test5", AclOperation.DESCRIBE);
        AclBinding newBinding2 = AclMigrationTest.aclBinding("test6", AclOperation.DESCRIBE);
        LinkedList<AclBinding> newBindings = new LinkedList<AclBinding>();
        newBindings.add(newBinding1);
        newBindings.add(newBinding2);
        authorizer3.createAcls((AuthorizableRequestContext)this.newRequestContext(), newBindings);
        Assert.assertEquals((long)sourceBindings.size(), (long)authorizer3.aclAuthorizer.srcAclCache.size());
        Assert.assertEquals((long)sourceBindings.size(), (long)authorizer3.secondAuthorizer.destAclCache.size());
        Assert.assertEquals(sourceBindings, authorizer3.secondAuthorizer.destAclCache);
        LinkedList<AclBindingFilter> deleteFilters = new LinkedList<AclBindingFilter>();
        deleteFilters.add(newBinding1.toFilter());
        deleteFilters.add(newBinding2.toFilter());
        authorizer3.deleteAcls((AuthorizableRequestContext)this.newRequestContext(), deleteFilters);
        Assert.assertEquals((long)sourceBindings.size(), (long)authorizer3.aclAuthorizer.srcAclCache.size());
        Assert.assertEquals((long)sourceBindings.size(), (long)authorizer3.secondAuthorizer.destAclCache.size());
        Assert.assertEquals(sourceBindings, authorizer3.secondAuthorizer.destAclCache);
        authorizer3.close();
    }

    private RequestContext newRequestContext() {
        KafkaPrincipal principal = new KafkaPrincipal("User", "test");
        return AuthorizerUtils.newRequestContext((String)"kafka", (KafkaPrincipal)principal, (String)"localhost");
    }

    private static AclBinding aclBinding(String topicName, AclOperation operation) {
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL), new AccessControlEntry(USER_PRINCIPAL.toString(), "*", operation, AclPermissionType.ALLOW));
    }

    private static void deleteBindings(List<AclBindingFilter> aclBindingFilters, List<AclBinding> aclCache) {
        for (AclBindingFilter aclBindingFilter : aclBindingFilters) {
            for (AclBinding aclBinding : aclCache) {
                if (!aclBindingFilter.matches(aclBinding)) continue;
                aclCache.remove(aclBinding);
            }
        }
    }

    private static /* synthetic */ void lambda$testAclMigrationLogic$0(Authorizer authorizer1, ConfluentAuthorizerServerInfo serverInfo) throws Throwable {
        ((ConfluentServerAuthorizer)authorizer1).configureServerInfo(serverInfo);
    }

    private static class TestSecondAuthorizer
    implements Authorizer,
    AclMigrationAware {
        List<AclBinding> destAclCache = new LinkedList<AclBinding>();

        private TestSecondAuthorizer() {
        }

        public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
            return null;
        }

        public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
            return null;
        }

        public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
            this.destAclCache.addAll(aclBindings);
            return Collections.emptyList();
        }

        public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
            AclMigrationTest.deleteBindings(aclBindingFilters, this.destAclCache);
            return Collections.emptyList();
        }

        public Iterable<AclBinding> acls(AclBindingFilter filter) {
            return this.destAclCache;
        }

        public void close() throws IOException {
        }

        public void configure(Map<String, ?> configs) {
        }

        public Runnable migrationTask(Authorizer sourceAuthorizer) {
            return () -> {
                Iterable bindings = sourceAuthorizer.acls(AclBindingFilter.ANY);
                for (AclBinding aclBinding : bindings) {
                    this.destAclCache.add(aclBinding);
                }
            };
        }
    }

    private static class TestAclAuthorizer
    implements Authorizer {
        List<AclBinding> srcAclCache = new LinkedList<AclBinding>();

        TestAclAuthorizer() {
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic1", AclOperation.WRITE));
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic2", AclOperation.READ));
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic3", AclOperation.DELETE));
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic4", AclOperation.DESCRIBE));
        }

        public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
            return null;
        }

        public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
            return null;
        }

        public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
            this.srcAclCache.addAll(aclBindings);
            return Collections.emptyList();
        }

        public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
            AclMigrationTest.deleteBindings(aclBindingFilters, this.srcAclCache);
            return Collections.emptyList();
        }

        public Iterable<AclBinding> acls(AclBindingFilter filter) {
            return this.srcAclCache;
        }

        public void close() throws IOException {
        }

        public void configure(Map<String, ?> configs) {
        }
    }

    private static class TestMigrationAuthorizer
    extends TestAuthorizer {
        TestAclAuthorizer aclAuthorizer = new TestAclAuthorizer();
        TestSecondAuthorizer secondAuthorizer = new TestSecondAuthorizer();

        private TestMigrationAuthorizer() {
        }

        protected Optional<Authorizer> zkAclProvider() {
            return Optional.of(this.aclAuthorizer);
        }

        protected Optional<Authorizer> centralizedAclProvider() {
            return Optional.of(this.secondAuthorizer);
        }
    }

    private static class TestAuthorizer
    extends ConfluentServerAuthorizer {
        volatile ConfluentAuthorizerServerInfo serverInfo;

        private TestAuthorizer() {
        }

        public void configure(Map<String, ?> configs) {
            super.configure(configs);
            if (this.serverInfo != null) {
                this.configureServerInfo(this.serverInfo);
            }
        }

        public void configureServerInfo(ConfluentAuthorizerServerInfo serverInfo) {
            this.serverInfo = serverInfo;
            super.configureServerInfo(serverInfo);
        }
    }
}

