/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.junit;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.junit.ClusterInstanceParameterResolver;
import kafka.test.junit.GenericParameterResolver;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Function0;
import scala.Option;
import scala.compat.java8.OptionConverters;

public class RaftClusterInvocationContext
implements TestTemplateInvocationContext {
    private final ClusterConfig clusterConfig;
    private final AtomicReference<KafkaClusterTestKit> clusterReference;

    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
        this.clusterReference = new AtomicReference();
        this.clusterConfig.serverProperties().put(KafkaConfig.ClusterLinkEnableProp(), (Object)false);
    }

    public String getDisplayName(int invocationIndex) {
        String clusterDesc = this.clusterConfig.nameTags().entrySet().stream().map(Object::toString).collect(Collectors.joining(", "));
        return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc);
    }

    public List<Extension> getAdditionalExtensions() {
        RaftClusterInstance clusterInstance = new RaftClusterInstance(this.clusterReference, this.clusterConfig);
        return Arrays.asList(context -> {
            KafkaClusterTestKit cluster = RaftClusterInvocationContext.createClusterReference(this.clusterConfig);
            this.clusterReference.set(cluster);
            if (this.clusterConfig.isAutoStart()) {
                clusterInstance.start();
            }
            kafka.utils.TestUtils.waitUntilTrue((Function0<Object>)((Function0)() -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING), (Function0<String>)((Function0)() -> "Broker never made it to RUNNING state."), 15000L, 100L);
        }, context -> clusterInstance.stop(), new ClusterInstanceParameterResolver(clusterInstance), new GenericParameterResolver<ClusterConfig>(this.clusterConfig, ClusterConfig.class));
    }

    public static KafkaClusterTestKit createClusterReference(ClusterConfig clusterConfig) throws Exception {
        TestKitNodes nodes = new TestKitNodes.Builder().setNumBrokerNodes(clusterConfig.numBrokers()).setNumControllerNodes(clusterConfig.numControllers()).build();
        nodes.brokerNodes().forEach((brokerId, brokerNode) -> clusterConfig.brokerServerProperties((int)brokerId).forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> brokerNode.propertyOverrides().put(key.toString(), value.toString()))));
        KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
        clusterConfig.serverProperties().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> builder.setConfigProp(key.toString(), value.toString())));
        return builder.build();
    }

    public static class RaftClusterInstance
    implements ClusterInstance {
        private final AtomicReference<KafkaClusterTestKit> clusterReference;
        private final ClusterConfig clusterConfig;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);
        private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue();

        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
            this.clusterReference = clusterReference;
            this.clusterConfig = clusterConfig;
        }

        @Override
        public String bootstrapServers() {
            return this.clusterReference.get().clientProperties().getProperty("bootstrap.servers");
        }

        @Override
        public Collection<SocketServer> brokerSocketServers() {
            return this.brokerStream().map(BrokerServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public ListenerName clientListener() {
            return ListenerName.normalised((String)"EXTERNAL");
        }

        @Override
        public ListenerName controllerListener() {
            return ListenerName.normalised((String)"CONTROLLER");
        }

        @Override
        public Optional<ListenerName> controllerListenerName() {
            return OptionConverters.toJava((Option)this.controllers().findAny().get().config().controllerListenerNames().headOption().map(ListenerName::new));
        }

        @Override
        public Collection<SocketServer> controllerSocketServers() {
            return this.controllers().map(ControllerServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public SocketServer anyBrokerSocketServer() {
            return this.brokerStream().map(BrokerServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
        }

        @Override
        public SocketServer anyControllerSocketServer() {
            return this.controllers().map(ControllerServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
        }

        @Override
        public SocketServer activeController() throws InterruptedException {
            TestUtils.waitForCondition(() -> this.controllers().anyMatch(controller -> controller.controller().isActive()), (String)"Timed out waiting for active controller");
            return this.controllers().filter(controller -> controller.controller().isActive()).map(ControllerServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
        }

        @Override
        public ClusterInstance.ClusterType clusterType() {
            return ClusterInstance.ClusterType.RAFT;
        }

        @Override
        public String clusterId() {
            return this.brokerStream().map(BrokerServer::clusterId).findFirst().orElseThrow(() -> new RuntimeException("No BrokerServers found"));
        }

        @Override
        public ClusterConfig config() {
            return this.clusterConfig;
        }

        @Override
        public KafkaClusterTestKit getUnderlying() {
            return this.clusterReference.get();
        }

        @Override
        public RaftClusterInstance duplicateCluster(Consumer<ClusterConfig.Builder> builderConsumer) {
            try {
                ClusterConfig newConfig = this.clusterConfig.copyOf(builderConsumer);
                AtomicReference<KafkaClusterTestKit> newClusterRef = new AtomicReference<KafkaClusterTestKit>(RaftClusterInvocationContext.createClusterReference(newConfig));
                RaftClusterInstance newClusterInstance = new RaftClusterInstance(newClusterRef, newConfig);
                if (this.clusterConfig.isAutoStart()) {
                    newClusterInstance.start();
                    try {
                        newClusterInstance.waitForReadyBrokers();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Interrupted while waiting for ready brokers", e);
                    }
                }
                return newClusterInstance;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create duplicate cluster", e);
            }
        }

        @Override
        public Admin createAdminClient(Properties configOverrides) {
            Admin admin = Admin.create((Properties)this.clusterReference.get().clientProperties(configOverrides));
            this.admins.add(admin);
            return admin;
        }

        @Override
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                try {
                    this.clusterReference.get().format();
                    this.clusterReference.get().startup();
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to start Raft server", e);
                }
            }
        }

        @Override
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.admins.forEach(admin -> Utils.closeQuietly((AutoCloseable)admin, (String)"admin"));
                Utils.closeQuietly((AutoCloseable)this.clusterReference.get(), (String)"cluster");
            }
        }

        @Override
        public void shutdownBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).shutdown();
        }

        @Override
        public void startBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).startup();
        }

        @Override
        public void waitForReadyBrokers() throws InterruptedException {
            try {
                this.clusterReference.get().waitForReadyBrokers();
            }
            catch (ExecutionException e) {
                throw new AssertionError("Failed while waiting for brokers to become ready", e);
            }
        }

        @Override
        public Map<Integer, KafkaBroker> brokers() {
            return this.brokerStream().collect(Collectors.toMap(broker -> broker.config().nodeId(), Function.identity()));
        }

        @Override
        public void rollingBrokerRestart() {
            throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
        }

        public Map<Integer, MetadataImage> metadataImage() {
            return this.brokerStream().collect(Collectors.toMap(brokerServer -> brokerServer.config().nodeId(), brokerServer -> brokerServer.metadataCache().currentImage()));
        }

        private BrokerServer findBrokerOrThrow(int brokerId) {
            return Optional.ofNullable(this.clusterReference.get().brokers().get(brokerId)).orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
        }

        private Stream<BrokerServer> brokerStream() {
            return this.clusterReference.get().brokers().values().stream();
        }

        private Stream<ControllerServer> controllers() {
            return this.clusterReference.get().controllers().values().stream();
        }
    }
}

