/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry;

import com.google.common.base.Preconditions;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import io.vavr.Tuple2;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.QuorumTestHarness;
import kafka.utils.TestUtils;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Java;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.test.TestSslUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

@Tag(value="IntegrationTest")
@DisplayNameGeneration(value=AddKraftQuorum.class)
public abstract class ClusterTestHarness {
    private static final Logger log = LoggerFactory.getLogger(ClusterTestHarness.class);
    public static final int DEFAULT_NUM_BROKERS = 1;
    public static final int MAX_MESSAGE_SIZE = 0x1400000;
    public static final String KAFKASTORE_TOPIC = "_schemas";
    protected static final Optional<Properties> EMPTY_SASL_PROPERTIES = Optional.empty();
    private final int numBrokers;
    private final boolean setupRestApp;
    protected String compatibilityType;
    private TestInfo testInfo;
    private QuorumTestHarness quorumTestHarness;
    protected List<KafkaConfig> configs = null;
    protected List<KafkaBroker> servers = null;
    protected String brokerList = null;
    protected Integer schemaRegistryPort;
    protected RestApp restApp = null;
    private static final boolean IS_IBM_SECURITY = Java.isIbmJdk() && !Java.isIbmJdkSemeru();

    public static int[] choosePorts(int count) {
        try {
            int i;
            ServerSocket[] sockets = new ServerSocket[count];
            int[] ports = new int[count];
            for (i = 0; i < count; ++i) {
                sockets[i] = new ServerSocket(0, 0, InetAddress.getByName("0.0.0.0"));
                ports[i] = sockets[i].getLocalPort();
            }
            for (i = 0; i < count; ++i) {
                sockets[i].close();
            }
            return ports;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static int choosePort() {
        return ClusterTestHarness.choosePorts(1)[0];
    }

    public ClusterTestHarness() {
        this(1);
    }

    public ClusterTestHarness(int numBrokers) {
        this(numBrokers, false);
    }

    public ClusterTestHarness(int numBrokers, boolean setupRestApp) {
        this(numBrokers, setupRestApp, CompatibilityLevel.NONE.name);
    }

    public ClusterTestHarness(int numBrokers, boolean setupRestApp, String compatibilityType) {
        this.numBrokers = numBrokers;
        this.setupRestApp = setupRestApp;
        this.compatibilityType = compatibilityType;
    }

    @BeforeEach
    public void setUpTest(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
        log.info("Starting setup of {}", (Object)this.getClass().getSimpleName());
        this.setUp();
        log.info("Completed setup of {}", (Object)this.getClass().getSimpleName());
    }

    protected void setUp() throws Exception {
        Preconditions.checkState((this.testInfo != null ? 1 : 0) != 0);
        log.info("Starting controller of {}", (Object)this.getClass().getSimpleName());
        this.quorumTestHarness = new DefaultQuorumTestHarness(this.overrideKraftControllerSecurityProtocol(), this.overrideKraftControllerConfig());
        this.quorumTestHarness.setUp(this.testInfo);
        this.startBrokersConcurrently(this.numBrokers);
        this.brokerList = TestUtils.getBrokerListStrFromServers((Seq)JavaConverters.asScalaBuffer(this.servers), (SecurityProtocol)this.getBrokerSecurityProtocol());
        this.setupAcls();
        if (this.setupRestApp) {
            Properties schemaRegistryProps;
            if (this.schemaRegistryPort == null) {
                this.schemaRegistryPort = ClusterTestHarness.choosePort();
            }
            if (!(schemaRegistryProps = this.getSchemaRegistryProperties()).containsKey("listeners")) {
                schemaRegistryProps.put("listeners", this.getSchemaRegistryProtocol() + "://0.0.0.0:" + this.schemaRegistryPort);
            }
            schemaRegistryProps.put("mode.mutability", (Object)true);
            this.setupRestApp(schemaRegistryProps);
        }
    }

    protected void setupRestApp(Properties schemaRegistryProps) throws Exception {
        this.restApp = new RestApp(this.schemaRegistryPort, null, this.brokerList, KAFKASTORE_TOPIC, this.compatibilityType, true, schemaRegistryProps);
        this.restApp.start();
    }

    protected Properties getSchemaRegistryProperties() throws Exception {
        return new Properties();
    }

    protected void injectProperties(Properties props) {
        props.setProperty("process.roles", "broker");
        props.setProperty("message.max.bytes", String.valueOf(0x1400000));
        props.setProperty("auto.create.topics.enable", "true");
        props.setProperty("num.partitions", "1");
        props.setProperty("cleanup.policy", "compact");
    }

    protected KafkaConfig getKafkaConfig(int brokerId) {
        Optional<File> noFile = Optional.empty();
        Properties props = ClusterTestHarness.createBrokerConfig(brokerId, false, true, TestUtils.RandomPort(), Optional.of(this.getBrokerSecurityProtocol()), noFile, EMPTY_SASL_PROPERTIES, true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Optional.empty(), false, 1, (short)1, false);
        this.injectProperties(props);
        return KafkaConfig.fromProps((Properties)props);
    }

    protected SecurityProtocol getBrokerSecurityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    protected String getSchemaRegistryProtocol() {
        return "http";
    }

    protected Time brokerTime(int brokerId) {
        return Time.SYSTEM;
    }

    private void startBrokersConcurrently(int numBrokers) {
        log.info("Starting concurrently {} brokers for {}", (Object)numBrokers, (Object)this.getClass().getSimpleName());
        this.configs = IntStream.range(0, numBrokers).mapToObj(this::getKafkaConfig).collect(Collectors.toList());
        this.servers = ClusterTestHarness.allAsList(this.configs.stream().map(config -> CompletableFuture.supplyAsync(() -> this.quorumTestHarness.createBroker(config, this.brokerTime(config.brokerId()), true, Option.empty()))).collect(Collectors.toList())).join();
        log.info("Started all {} brokers for {}", (Object)numBrokers, (Object)this.getClass().getSimpleName());
    }

    static <T> CompletableFuture<List<T>> allAsList(List<CompletableFuture<T>> futures) {
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(none -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

    protected void setupAcls() {
    }

    protected Properties overrideKraftControllerConfig() {
        return new Properties();
    }

    protected SecurityProtocol overrideKraftControllerSecurityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    @AfterEach
    public void tearDown() throws Exception {
        log.info("Starting teardown of {}", (Object)this.getClass().getSimpleName());
        if (this.restApp != null) {
            this.restApp.stop();
        }
        this.tearDownMethod();
        log.info("Completed teardown of {}", (Object)this.getClass().getSimpleName());
    }

    private void tearDownMethod() throws Exception {
        Preconditions.checkState((this.quorumTestHarness != null ? 1 : 0) != 0);
        TestUtils.shutdownServers((Seq)JavaConverters.asScalaBuffer(this.servers), (boolean)true);
        log.info("Stopping controller of {}", (Object)this.getClass().getSimpleName());
        this.quorumTestHarness.tearDown();
    }

    public static Properties createBrokerConfig(int nodeId, boolean enableControlledShutdown, boolean enableDeleteTopic, int port, Optional<SecurityProtocol> interBrokerSecurityProtocol, Optional<File> trustStoreFile, Optional<Properties> saslProperties, boolean enablePlaintext, boolean enableSaslPlaintext, int saslPlaintextPort, boolean enableSsl, int sslPort, boolean enableSaslSsl, int saslSslPort, Optional<String> rack, boolean enableToken, int numPartitions, short defaultReplicationFactor, boolean enableFetchFromFollower) {
        try {
            Function<SecurityProtocol, Boolean> shouldEnable = protocol -> interBrokerSecurityProtocol.map(p -> p == protocol).orElse(false);
            ArrayList<Tuple2> protocolAndPorts = new ArrayList<Tuple2>();
            if (enablePlaintext || shouldEnable.apply(SecurityProtocol.PLAINTEXT).booleanValue()) {
                protocolAndPorts.add(new Tuple2((Object)SecurityProtocol.PLAINTEXT, (Object)port));
            }
            if (enableSsl || shouldEnable.apply(SecurityProtocol.SSL).booleanValue()) {
                protocolAndPorts.add(new Tuple2((Object)SecurityProtocol.SSL, (Object)sslPort));
            }
            if (enableSaslPlaintext || shouldEnable.apply(SecurityProtocol.SASL_PLAINTEXT).booleanValue()) {
                protocolAndPorts.add(new Tuple2((Object)SecurityProtocol.SASL_PLAINTEXT, (Object)saslPlaintextPort));
            }
            if (enableSaslSsl || shouldEnable.apply(SecurityProtocol.SASL_SSL).booleanValue()) {
                protocolAndPorts.add(new Tuple2((Object)SecurityProtocol.SASL_SSL, (Object)saslSslPort));
            }
            String listeners = protocolAndPorts.stream().map(p -> ((SecurityProtocol)p._1).name() + "://localhost:" + String.valueOf(p._2)).reduce((a, b) -> a + "," + b).get();
            String protocolMap = protocolAndPorts.stream().map(p -> ((SecurityProtocol)p._1).name() + ":" + String.valueOf(p._1)).reduce((a, b) -> a + "," + b).get();
            Properties props = new Properties();
            props.put("unstable.feature.versions.enable", "true");
            props.put("unstable.api.versions.enable", "true");
            props.setProperty("server.max.startup.time.ms", String.valueOf(TimeUnit.MINUTES.toMillis(10L)));
            props.put("node.id", String.valueOf(nodeId));
            props.put("broker.id", String.valueOf(nodeId));
            props.put("advertised.listeners", listeners);
            props.put("listeners", listeners);
            props.put("controller.listener.names", "CONTROLLER");
            props.put("listener.security.protocol.map", protocolMap + ",CONTROLLER:PLAINTEXT");
            props.put("log.dir", TestUtils.tempDir().getAbsolutePath());
            props.put("process.roles", "broker");
            props.put("controller.quorum.voters", "1000@localhost:0");
            props.put("replica.socket.timeout.ms", "1500");
            props.put("controller.socket.timeout.ms", "1500");
            props.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown));
            props.put("delete.topic.enable", String.valueOf(enableDeleteTopic));
            props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
            props.put("log.cleaner.dedupe.buffer.size", "2097152");
            props.put("offsets.topic.replication.factor", "1");
            props.put("log.initial.task.delay.ms", "100");
            if (!props.containsKey("offsets.topic.num.partitions")) {
                props.put("offsets.topic.num.partitions", "5");
            }
            if (!props.containsKey("group.initial.rebalance.delay.ms")) {
                props.put("group.initial.rebalance.delay.ms", "0");
            }
            rack.ifPresent(r -> props.put("broker.rack", r));
            props.put("num.network.threads", "2");
            props.put("background.threads", "2");
            if (protocolAndPorts.stream().anyMatch(p -> ClusterTestHarness.usesSslTransportLayer((SecurityProtocol)p._1))) {
                props.putAll((Map<?, ?>)ClusterTestHarness.sslConfigs(ConnectionMode.SERVER, false, trustStoreFile, "server" + nodeId));
            }
            if (protocolAndPorts.stream().anyMatch(p -> ClusterTestHarness.usesSaslAuthentication((SecurityProtocol)p._1))) {
                props.putAll((Map<?, ?>)ClusterTestHarness.saslConfigs(saslProperties));
            }
            interBrokerSecurityProtocol.ifPresent(protocol -> props.put("security.inter.broker.protocol", protocol.name));
            if (enableToken) {
                props.put("delegation.token.secret.key", "secretkey");
            }
            props.put("num.partitions", String.valueOf(numPartitions));
            props.put("default.replication.factor", String.valueOf(defaultReplicationFactor));
            if (enableFetchFromFollower) {
                props.put("broker.rack", String.valueOf(nodeId));
                props.put("replica.selector.class", "org.apache.kafka.common.replica.RackAwareReplicaSelector");
            }
            return props;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static boolean usesSslTransportLayer(SecurityProtocol securityProtocol) {
        switch (securityProtocol) {
            case SSL: 
            case SASL_SSL: {
                return true;
            }
        }
        return false;
    }

    public static boolean usesSaslAuthentication(SecurityProtocol securityProtocol) {
        switch (securityProtocol) {
            case SASL_SSL: 
            case SASL_PLAINTEXT: {
                return true;
            }
        }
        return false;
    }

    public static Properties sslConfigs(ConnectionMode mode, boolean clientCert, Optional<File> trustStoreFile, String certAlias) throws Exception {
        return ClusterTestHarness.sslConfigs(mode, clientCert, trustStoreFile, certAlias, "localhost", "TLSv1.3");
    }

    public static Properties sslConfigs(ConnectionMode mode, boolean clientCert, Optional<File> trustStoreFile, String certAlias, String certCn, String tlsProtocol) throws Exception {
        File trustStore = trustStoreFile.orElseThrow(() -> new Exception("SSL enabled but no trustStoreFile provided"));
        Properties sslProps = new Properties();
        sslProps.putAll((Map<?, ?>)new TestSslUtils.SslConfigsBuilder(mode).useClientCert(clientCert).createNewTrustStore(trustStore).certAlias(certAlias).cn(certCn).tlsProtocol(tlsProtocol).build());
        return sslProps;
    }

    public static Properties saslConfigs(Optional<Properties> saslProperties) {
        Properties result = saslProperties.orElse(new Properties());
        if (IS_IBM_SECURITY && !result.containsKey("sasl.kerberos.service.name")) {
            result.put("sasl.kerberos.service.name", "kafka");
        }
        return result;
    }

    static class DefaultQuorumTestHarness
    extends QuorumTestHarness {
        private final Properties kraftControllerConfig;
        private final SecurityProtocol securityProtocol;

        DefaultQuorumTestHarness(SecurityProtocol securityProtocol, Properties kraftControllerConfig) {
            this.securityProtocol = securityProtocol;
            this.kraftControllerConfig = kraftControllerConfig;
        }

        public SecurityProtocol controllerListenerSecurityProtocol() {
            return this.securityProtocol;
        }

        public Seq<Properties> kraftControllerConfigs(TestInfo testInfo) {
            return JavaConverters.asScalaBuffer(Collections.singletonList(this.kraftControllerConfig));
        }
    }

    static class AddKraftQuorum
    extends DisplayNameGenerator.Standard {
        AddKraftQuorum() {
        }

        public String generateDisplayNameForClass(Class<?> testClass) {
            return this.addKraftQuorum(super.generateDisplayNameForClass(testClass));
        }

        public String generateDisplayNameForNestedClass(Class<?> nestedClass) {
            return this.addKraftQuorum(super.generateDisplayNameForNestedClass(nestedClass));
        }

        public String generateDisplayNameForMethod(Class<?> testClass, Method testMethod) {
            return this.addKraftQuorum(super.generateDisplayNameForMethod(testClass, testMethod));
        }

        String addKraftQuorum(String name) {
            return name + ",quorum=kraft";
        }
    }
}

