/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.junit;

import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.junit.ClusterInstanceParameterResolver;
import kafka.test.junit.GenericParameterResolver;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Function0;
import scala.Option;
import scala.compat.java8.OptionConverters;

public class RaftClusterInvocationContext
implements TestTemplateInvocationContext {
    private final String baseDisplayName;
    private final ClusterConfig clusterConfig;
    private final AtomicReference<KafkaClusterTestKit> clusterReference;
    private final AtomicReference<EmbeddedZookeeper> zkReference;
    private final boolean isCombined;

    public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) {
        this.baseDisplayName = baseDisplayName;
        this.clusterConfig = clusterConfig;
        this.clusterReference = new AtomicReference();
        this.zkReference = new AtomicReference();
        this.isCombined = isCombined;
    }

    public String getDisplayName(int invocationIndex) {
        String clusterDesc = this.clusterConfig.nameTags().entrySet().stream().map(Object::toString).collect(Collectors.joining(", "));
        return String.format("%s [%d] Type=Raft-%s, %s", this.baseDisplayName, invocationIndex, this.isCombined ? "Combined" : "Isolated", clusterDesc);
    }

    public List<Extension> getAdditionalExtensions() {
        RaftClusterInstance clusterInstance = new RaftClusterInstance(this.clusterReference, this.zkReference, this.clusterConfig);
        return Arrays.asList(context -> {
            Map.Entry<KafkaClusterTestKit, EmbeddedZookeeper> entry = RaftClusterInvocationContext.createClusterReference(this.clusterConfig, this.isCombined, Uuid.randomUuid(), Optional.empty());
            KafkaClusterTestKit cluster = entry.getKey();
            this.clusterReference.set(entry.getKey());
            this.zkReference.set(entry.getValue());
            if (this.clusterConfig.isAutoStart()) {
                clusterInstance.start();
                kafka.utils.TestUtils.waitUntilTrue((Function0<Object>)((Function0)() -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING), (Function0<String>)((Function0)() -> "Broker never made it to RUNNING state."), 15000L, 100L);
            }
        }, context -> clusterInstance.stop(), new ClusterInstanceParameterResolver(clusterInstance), new GenericParameterResolver<ClusterConfig>(this.clusterConfig, ClusterConfig.class));
    }

    public static Map.Entry<KafkaClusterTestKit, EmbeddedZookeeper> createClusterReference(ClusterConfig clusterConfig, boolean isCombined, Uuid clusterId, Optional<EmbeddedZookeeper> embeddedZookeeper) throws Exception {
        KafkaClusterTestKit testKit;
        EmbeddedZookeeper zookeeper;
        block3: {
            zookeeper = null;
            testKit = null;
            try {
                TestKitNodes nodes = new TestKitNodes.Builder().setClusterId(clusterId).setBootstrapMetadataVersion(clusterConfig.metadataVersion()).setCombined(isCombined).setNumBrokerNodes(clusterConfig.numBrokers()).setNumControllerNodes(clusterConfig.numControllers()).build();
                nodes.brokerNodes().forEach((brokerId, brokerNode) -> clusterConfig.brokerServerProperties((int)brokerId).forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> brokerNode.propertyOverrides().put(key.toString(), value.toString()))));
                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
                if (Boolean.parseBoolean(clusterConfig.serverProperties().getProperty("zookeeper.metadata.migration.enable", "false"))) {
                    zookeeper = embeddedZookeeper.orElseGet(EmbeddedZookeeper::new);
                    builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zookeeper.port()));
                }
                clusterConfig.serverProperties().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> builder.setConfigProp(key.toString(), value.toString())));
                testKit = builder.build();
            }
            catch (Exception e) {
                if (zookeeper == null || embeddedZookeeper.isPresent()) break block3;
                zookeeper.shutdown();
            }
        }
        return new AbstractMap.SimpleImmutableEntry<Object, EmbeddedZookeeper>(testKit, zookeeper);
    }

    public static class RaftClusterInstance
    implements ClusterInstance {
        private final AtomicReference<KafkaClusterTestKit> clusterReference;
        private final AtomicReference<EmbeddedZookeeper> zkReference;
        private final ClusterConfig clusterConfig;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);
        private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue();

        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, AtomicReference<EmbeddedZookeeper> zkReference, ClusterConfig clusterConfig) {
            this.clusterReference = clusterReference;
            this.zkReference = zkReference;
            this.clusterConfig = clusterConfig;
        }

        @Override
        public String bootstrapServers() {
            return this.clusterReference.get().bootstrapServers();
        }

        @Override
        public String bootstrapControllers() {
            return this.clusterReference.get().bootstrapControllers();
        }

        @Override
        public Collection<SocketServer> brokerSocketServers() {
            return this.brokers().map(BrokerServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public ListenerName clientListener() {
            return ListenerName.normalised((String)"EXTERNAL");
        }

        @Override
        public ListenerName controllerListener() {
            return ListenerName.normalised((String)"CONTROLLER");
        }

        @Override
        public Optional<ListenerName> controllerListenerName() {
            return OptionConverters.toJava((Option)this.controllers().findAny().get().config().controllerListenerNames().headOption().map(ListenerName::new));
        }

        @Override
        public Collection<SocketServer> controllerSocketServers() {
            return this.controllers().map(ControllerServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public SocketServer anyBrokerSocketServer() {
            return this.brokers().map(BrokerServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
        }

        @Override
        public SocketServer anyNonControllerBrokerSocketServer() {
            return this.anyBrokerSocketServer();
        }

        @Override
        public SocketServer anyControllerSocketServer() {
            return this.controllers().map(ControllerServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
        }

        @Override
        public SocketServer activeController() throws InterruptedException {
            TestUtils.waitForCondition(() -> this.controllers().anyMatch(controller -> controller.controller().isActive()), (String)"Timed out waiting for active controller");
            return this.controllers().filter(controller -> controller.controller().isActive()).map(ControllerServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
        }

        @Override
        public SocketServer linkCoordinator(String linkName) {
            List linkCoordinators = this.brokers().filter(broker -> broker.clusterLinkManager().isLinkCoordinator(linkName)).map(BrokerServer::socketServer).collect(Collectors.toList());
            if (linkCoordinators.size() != 1) {
                throw new IllegalStateException("Failed to find a single link coordinator for " + linkName);
            }
            return (SocketServer)linkCoordinators.get(0);
        }

        @Override
        public Map<Integer, BrokerFeatures> brokerFeatures() {
            return this.brokers().collect(Collectors.toMap(brokerServer -> brokerServer.config().nodeId(), BrokerServer::brokerFeatures));
        }

        @Override
        public String clusterId() {
            return this.controllers().findFirst().map(ControllerServer::clusterId).orElse(this.brokers().findFirst().map(BrokerServer::clusterId).orElseThrow(() -> new RuntimeException("No controllers or brokers!")));
        }

        public Collection<ControllerServer> controllerServers() {
            return this.controllers().collect(Collectors.toList());
        }

        @Override
        public ClusterInstance.ClusterType clusterType() {
            return ClusterInstance.ClusterType.RAFT;
        }

        @Override
        public ClusterConfig config() {
            return this.clusterConfig;
        }

        @Override
        public Set<Integer> controllerIds() {
            return this.controllers().map(controllerServer -> controllerServer.config().nodeId()).collect(Collectors.toSet());
        }

        @Override
        public Set<Integer> brokerIds() {
            return this.brokers().map(brokerServer -> brokerServer.config().nodeId()).collect(Collectors.toSet());
        }

        @Override
        public KafkaClusterTestKit getUnderlying() {
            return this.clusterReference.get();
        }

        @Override
        public RaftClusterInstance duplicateCluster(Consumer<ClusterConfig.Builder> builderConsumer) {
            try {
                ClusterConfig newConfig = this.clusterConfig.copyOf(builderConsumer);
                newConfig.serverProperties().put(KafkaConfig.MigrationEnabledProp(), "false");
                Map.Entry<KafkaClusterTestKit, EmbeddedZookeeper> entry = RaftClusterInvocationContext.createClusterReference(newConfig, false, Uuid.randomUuid(), Optional.empty());
                AtomicReference<KafkaClusterTestKit> newClusterRef = new AtomicReference<KafkaClusterTestKit>(entry.getKey());
                RaftClusterInstance newClusterInstance = new RaftClusterInstance(newClusterRef, new AtomicReference<EmbeddedZookeeper>(entry.getValue()), newConfig);
                if (this.clusterConfig.isAutoStart()) {
                    newClusterInstance.start();
                    try {
                        newClusterInstance.waitForReadyBrokers();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Interrupted while waiting for ready brokers", e);
                    }
                }
                return newClusterInstance;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create duplicate cluster", e);
            }
        }

        @Override
        public Admin createAdminClient(Properties configOverrides) {
            Admin admin = Admin.create((Properties)this.clusterReference.get().newClientPropertiesBuilder(configOverrides).build());
            this.admins.add(admin);
            return admin;
        }

        @Override
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                try {
                    this.clusterReference.get().format();
                    this.clusterReference.get().startup();
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to start Raft server", e);
                }
            }
        }

        @Override
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.admins.forEach(admin -> Utils.closeQuietly((AutoCloseable)admin, (String)"admin"));
                Utils.closeQuietly((AutoCloseable)this.clusterReference.get(), (String)"cluster");
                if (this.zkReference.get() != null) {
                    Utils.closeQuietly((AutoCloseable)this.zkReference.get(), (String)"zk");
                }
            }
        }

        @Override
        public void shutdownBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).shutdown();
        }

        @Override
        public void startBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).startup();
        }

        @Override
        public void waitForReadyBrokers() throws InterruptedException {
            try {
                this.clusterReference.get().waitForReadyBrokers();
            }
            catch (ExecutionException e) {
                throw new AssertionError("Failed while waiting for brokers to become ready", e);
            }
        }

        @Override
        public Map<Integer, KafkaBroker> brokersMap() {
            return this.brokers().collect(Collectors.toMap(broker -> broker.config().nodeId(), Function.identity()));
        }

        @Override
        public void rollingBrokerRestart() {
            throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
        }

        @Override
        public void killAllBrokers() {
            throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
        }

        public Map<Integer, MetadataImage> metadataImage() {
            return this.brokers().collect(Collectors.toMap(brokerServer -> brokerServer.config().nodeId(), brokerServer -> brokerServer.metadataCache().currentImage()));
        }

        private BrokerServer findBrokerOrThrow(int brokerId) {
            return Optional.ofNullable(this.clusterReference.get().brokers().get(brokerId)).orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
        }

        public Stream<BrokerServer> brokers() {
            return this.clusterReference.get().brokers().values().stream();
        }

        public Map<Integer, ControllerServer> controllersMap() {
            return this.controllers().collect(Collectors.toMap(controller -> controller.config().nodeId(), Function.identity()));
        }

        public Stream<ControllerServer> controllers() {
            return this.clusterReference.get().controllers().values().stream();
        }

        public Option<CompletableFuture<Integer>> interBrokerPortFuture() {
            return this.clusterReference.get().interBrokerPortFuture();
        }
    }
}

