/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.test.cluster;

import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.FaultHandlerFactory;
import kafka.server.K2StackBuilderWrapper;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.QuorumTestHarnessFaultHandlerFactory;
import kafka.server.Server;
import kafka.server.SharedServer;
import kafka.utils.CoreUtils;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestInfoUtils;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.server.ServerSocketFactory;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.server.multitenant.MultiTenantMetadata;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;

public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private final File logDir;
    private final KafkaConfig kafkaConfig;
    private final Optional<ControllerServer> controllerServer;
    private final TestInfo testInfo;
    private final Optional<K2StackBuilderWrapper> stackBuilderOverride;
    private final Time time;
    private Path tmpDir;
    private volatile KafkaBroker kafka;
    private boolean isShutdown;
    private MockFaultHandler faultHandler = new MockFaultHandler("kafka");

    private EmbeddedKafka(Properties config, Time time, Optional<ControllerServer> controllerServer) {
        this(config, time, controllerServer, (TestInfo)new EmptyTestInfo(), Optional.empty());
    }

    private EmbeddedKafka(Properties config, Time time, Optional<ControllerServer> controllerServer, TestInfo testInfo, Optional<K2StackBuilderWrapper> stackBuilderOverride) {
        try {
            this.controllerServer = controllerServer;
            this.testInfo = testInfo;
            this.stackBuilderOverride = stackBuilderOverride;
            this.time = time;
            if (TestInfoUtils.isCombinedKRaft((TestInfo)testInfo)) {
                this.kafkaConfig = controllerServer.get().sharedServer().brokerConfig();
                this.logDir = new File((String)this.kafkaConfig.logDirs().get(0));
            } else {
                this.tmpDir = TestUtils.tempDir().toPath();
                this.logDir = Files.createTempDirectory(this.tmpDir, null, new FileAttribute[0]).toFile();
                Properties effectiveConfig = this.brokerConfigs(config, controllerServer);
                this.kafkaConfig = new KafkaConfig((java.util.Map)effectiveConfig, true);
            }
            this.initializeKafkaBroker();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initializeKafkaBroker() throws IOException {
        if (!this.controllerServer.isPresent()) {
            throw new UnsupportedOperationException("ZK Kafka Server is not supported");
        }
        log.debug("Starting embedded KRaft Kafka broker %d (with log.dirs={}).", (Object)this.kafkaConfig.nodeId(), (Object)this.logDir);
        String clusterId = this.controllerServer.get().clusterId();
        Metrics metrics = Server.initializeMetrics((KafkaConfig)this.kafkaConfig, (Time)this.time, (String)clusterId);
        MultiTenantMetadata multiTenantMetadata = ConfluentConfigs.buildMultitenantMetadata((java.util.Map)this.kafkaConfig.values(), (Metrics)metrics);
        MetaProperties metaProperties = new MetaProperties.Builder().setClusterId(clusterId).setNodeId(this.kafkaConfig.nodeId()).setDirectoryId(DirectoryId.random()).build();
        MetaPropertiesEnsemble.Copier copier = new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY).setLogDirProps(this.logDir.getAbsolutePath(), metaProperties).setMetaLogDir(Optional.of(this.logDir.getAbsolutePath()));
        copier.emptyLogDirs().clear();
        copier.setPreWriteHandler((logDir, isNew, metaProperites) -> {
            log.info("Formatting {}.", (Object)logDir);
            Files.createDirectories(Paths.get(logDir, new String[0]), new FileAttribute[0]);
        });
        copier.writeLogDirChanges();
        SharedServer sharedServer = TestInfoUtils.isCombinedKRaft((TestInfo)this.testInfo) ? this.controllerServer.get().sharedServer() : new SharedServer(this.kafkaConfig, copier.copy(), this.time, metrics, this.controllerServer.get().sharedServer().controllerQuorumVotersFuture(), Collections.emptyList(), (FaultHandlerFactory)new QuorumTestHarnessFaultHandlerFactory(this.faultHandler), ServerSocketFactory.INSTANCE, Option.apply((Object)multiTenantMetadata), KafkaRaftServer.configSchema(), this.controllerServer.get().sharedServer().interBrokerPortFuture());
        this.kafka = new BrokerServer(sharedServer, this.stackBuilderOverride);
        this.isShutdown = false;
    }

    private Properties brokerConfigs(Properties overrideProps, Optional<ControllerServer> controllerServer) {
        Properties brokerConfigs = new Properties();
        brokerConfigs.put("broker.id", (Object)0);
        brokerConfigs.put("listeners", "PLAINTEXT://127.0.0.1:9092");
        brokerConfigs.put("num.partitions", (Object)1);
        brokerConfigs.put("auto.create.topics.enable", (Object)true);
        brokerConfigs.put("message.max.bytes", (Object)1000000);
        brokerConfigs.put("controlled.shutdown.enable", (Object)true);
        brokerConfigs.put("confluent.license.topic.replication.factor", "1");
        if (controllerServer.isPresent()) {
            brokerConfigs.put("process.roles", "broker");
            brokerConfigs.put("controller.quorum.voters", String.format("%d@localhost:0", controllerServer.get().config().brokerId()));
            brokerConfigs.put(KafkaConfig.ClusterLinkEnableProp(), "false");
            brokerConfigs.put("controller.listener.names", "CONTROLLER");
        }
        brokerConfigs.putAll((java.util.Map<?, ?>)overrideProps);
        if (controllerServer.isPresent()) {
            String securityMap = brokerConfigs.getProperty("listener.security.protocol.map", "");
            ArrayList<Object> securityEntries = new ArrayList<Object>();
            for (String securityEntry : securityMap.split(",")) {
                if (securityEntry.trim().isEmpty()) continue;
                securityEntries.add(securityEntry);
            }
            TreeSet<String> listeners = new TreeSet<String>();
            for (String listener : brokerConfigs.getProperty("listeners", "").split(",")) {
                int firstColon = listener.indexOf(58);
                if (firstColon < 0) {
                    listeners.add(listener);
                    continue;
                }
                listeners.add(listener.substring(0, firstColon));
            }
            listeners.add("CONTROLLER");
            for (String listener : listeners) {
                if (securityEntries.stream().anyMatch(e -> e.startsWith(listener))) continue;
                securityEntries.add(listener + ":PLAINTEXT");
            }
            brokerConfigs.setProperty("listener.security.protocol.map", securityEntries.stream().collect(Collectors.joining(",")));
        }
        brokerConfigs.setProperty("log.dir", this.logDir.getAbsolutePath());
        return brokerConfigs;
    }

    public synchronized String brokerConnect(String listenerName) {
        Endpoint endpoint = (Endpoint)this.kafka.advertisedListener(listenerName).get();
        return endpoint.host() + ":" + endpoint.port();
    }

    public boolean isKRaft() {
        return this.controllerServer.isPresent();
    }

    public synchronized String zkConnect() {
        return null;
    }

    public synchronized String brokerSessionUuid() {
        return this.kafkaConfig.brokerSessionUuid();
    }

    public void shutdownAndCleanup() {
        this.shutdown();
        log.debug("Removing logs.dir at {} ...", (Object)this.logDir);
        List<String> logDirs = Collections.singletonList(this.logDir.getAbsolutePath());
        CoreUtils.delete(logDirs);
        try {
            KafkaLogicalClusterUtils.deleteDir((Path)this.tmpDir);
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        log.debug("Shutdown and cleanup of embedded Kafka broker completed {}.", (Object)this);
        this.faultHandler.maybeRethrowFirstException();
    }

    public synchronized void shutdown() {
        if (this.isShutdown) {
            log.debug("Embedded Kafka broker {} was already shut down. Skipping shutdown", (Object)this);
            return;
        }
        if (this.kafka != null) {
            log.debug("Shutting down embedded Kafka broker {} ...", (Object)this);
            this.kafka.shutdown();
            this.kafka.awaitShutdown();
            this.isShutdown = true;
            log.debug("Shutdown of embedded Kafka broker completed {}.", (Object)this);
            this.kafka = null;
        }
    }

    public synchronized void startBroker() throws IOException {
        if (this.kafka == null) {
            this.initializeKafkaBroker();
            this.startKafkaBroker();
        }
    }

    public synchronized void startKafkaBroker() {
        this.kafka.startup();
        log.debug("Startup of embedded BrokerServer completed: {}", (Object)this);
    }

    public void createTopic(ListenerName listenerName, String topic, int partitions, int replication, Properties topicConfig) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{topic, partitions, replication, topicConfig});
        try (Admin admin = TestUtils.createAdminClient((Seq)JavaConverters.asScalaBuffer(Collections.singletonList(this.kafka)).toSeq(), (ListenerName)listenerName, (Properties)new Properties());){
            TestUtils.createTopicWithAdmin((Admin)admin, (String)topic, (Seq)JavaConverters.asScalaBuffer(Collections.singletonList(this.kafka)).toSeq(), (Seq)JavaConverters.asScalaBuffer(this.controllerServer.map(Collections::singletonList).orElse(Collections.emptyList())).toSeq(), (int)partitions, (int)replication, (Map)JavaConverters.mapAsScalaMap(Collections.emptyMap()), (Properties)topicConfig);
        }
    }

    public KafkaBroker kafkaBroker() {
        return this.kafka;
    }

    public synchronized Endpoint endPoint() {
        return (Endpoint)this.kafka.advertisedListeners().head();
    }

    public synchronized List<String> listeners() {
        return JavaConverters.seqAsJavaList((Seq)this.kafka.config().listeners()).stream().map(e -> e.listener()).collect(Collectors.toList());
    }

    public synchronized String toString() {
        if (this.kafka == null) {
            return "EmbeddedKafka";
        }
        java.util.Map endpoints = this.listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect));
        return String.format("EmbeddedKafka(brokerId=%d, endpoints=%s, zkConnect=%s)", this.kafka.config().brokerId(), Utils.mkString(endpoints, (String)"", (String)"", (String)":", (String)","), this.zkConnect());
    }

    public static class Builder {
        private final Properties config;
        private final Time time;
        private Optional<ControllerServer> controllerServer = Optional.empty();
        private Optional<K2StackBuilderWrapper> stackBuilderOverride = Optional.empty();

        public Builder(Time time) {
            this.config = new Properties();
            this.time = time;
        }

        public Builder addConfigs(Properties props) {
            this.config.putAll((java.util.Map<?, ?>)props);
            return this;
        }

        public Builder setControllerServer(ControllerServer controllerServer) {
            this.controllerServer = Optional.of(controllerServer);
            return this;
        }

        public Builder setStackBuilderOverride(Optional<K2StackBuilderWrapper> stackBuilderOverride) {
            this.stackBuilderOverride = stackBuilderOverride;
            return this;
        }

        public EmbeddedKafka build() {
            return new EmbeddedKafka(this.config, this.time, this.controllerServer);
        }

        public EmbeddedKafka build(TestInfo testInfo) {
            return new EmbeddedKafka(this.config, this.time, this.controllerServer, testInfo, this.stackBuilderOverride);
        }
    }
}

