/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.test.cluster;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.admin.RackAwareMode;
import kafka.cluster.EndPoint;
import kafka.raft.KafkaRaftManager;
import kafka.raft.RaftManager;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.multitenant.MultiTenantMetadata;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;

public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final File logDir;
    private final Properties effectiveConfig;
    private final KafkaConfig kafkaConfig;
    private final Optional<KafkaRaftManager<ApiMessageAndVersion>> raftManager;
    private Path tmpDir;
    private KafkaBroker kafka;
    private boolean isShutdown;

    private EmbeddedKafka(Properties config, Time time, Optional<KafkaRaftManager<ApiMessageAndVersion>> raftManager) {
        try {
            this.raftManager = raftManager;
            this.tmpDir = TestUtils.tempDir().toPath();
            this.logDir = Files.createTempDirectory(this.tmpDir, null, new FileAttribute[0]).toFile();
            this.effectiveConfig = this.brokerConfigs(config);
            this.kafkaConfig = new KafkaConfig((java.util.Map)this.effectiveConfig, true);
            if (raftManager.isPresent()) {
                log.debug("Starting embedded KRaft Kafka broker %d (with log.dirs={}).", (Object)this.kafkaConfig.nodeId(), (Object)this.logDir);
                String clusterId = raftManager.get().metaProperties().clusterId();
                Metrics metrics = Server.initializeMetrics((KafkaConfig)this.kafkaConfig, (Time)time, (String)clusterId);
                MultiTenantMetadata multiTenantMetadata = ConfluentConfigs.buildMultitenantMetadata((java.util.Map)this.kafkaConfig.values(), (Metrics)metrics);
                this.kafka = new BrokerServer(this.kafkaConfig, new MetaProperties(clusterId, this.kafkaConfig.nodeId()), (RaftManager)raftManager.get(), Option.empty(), Time.SYSTEM, metrics, Option.apply((Object)String.format("Broker%02d_", this.kafkaConfig.nodeId())), (Seq)JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq(), raftManager.get().controllerQuorumVotersFuture(), Option.apply((Object)multiTenantMetadata));
                this.kafka.startup();
                log.debug("Startup of embedded BrokerServer completed: {}", (Object)this);
            } else {
                log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", (Object)this.logDir, (Object)this.kafkaConfig.zkConnect());
                this.kafka = TestUtils.createServer((KafkaConfig)this.kafkaConfig, (Time)time);
                log.debug("Startup of embedded KafkaServer completed: {}", (Object)this);
            }
            this.isShutdown = false;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Properties brokerConfigs(Properties overrideProps) {
        Properties brokerConfigs = new Properties();
        brokerConfigs.put(KafkaConfig.BrokerIdProp(), (Object)0);
        brokerConfigs.put(KafkaConfig.ListenersProp(), "PLAINTEXT://127.0.0.1:9092");
        brokerConfigs.put(KafkaConfig.NumPartitionsProp(), (Object)1);
        brokerConfigs.put(KafkaConfig.AutoCreateTopicsEnableProp(), (Object)true);
        brokerConfigs.put(KafkaConfig.MessageMaxBytesProp(), (Object)1000000);
        brokerConfigs.put(KafkaConfig.ControlledShutdownEnableProp(), (Object)true);
        brokerConfigs.put("confluent.license.topic.replication.factor", "1");
        if (this.isKRaft()) {
            brokerConfigs.put(KafkaConfig.ProcessRolesProp(), "broker");
            brokerConfigs.put(KafkaConfig.QuorumVotersProp(), "1000@localhost:0");
            brokerConfigs.put(KafkaConfig.ClusterLinkEnableProp(), "false");
            brokerConfigs.put(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER");
        }
        brokerConfigs.putAll((java.util.Map<?, ?>)overrideProps);
        if (this.isKRaft()) {
            String securityMap = brokerConfigs.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "");
            ArrayList<String> securityEntries = new ArrayList<String>();
            for (String securityEntry : securityMap.split(",")) {
                if (securityEntry.trim().isEmpty()) continue;
                securityEntries.add(securityEntry);
            }
            TreeSet<String> listeners = new TreeSet<String>();
            for (String listener : brokerConfigs.getProperty(KafkaConfig.ListenersProp(), "").split(",")) {
                int firstColon = listener.indexOf(58);
                if (firstColon < 0) {
                    listeners.add(listener);
                    continue;
                }
                listeners.add(listener.substring(0, firstColon));
            }
            listeners.add("CONTROLLER");
            for (String listener : listeners) {
                if (securityEntries.stream().anyMatch(e -> e.startsWith(listener))) continue;
                securityEntries.add(listener + ":PLAINTEXT");
            }
            brokerConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), securityEntries.stream().collect(Collectors.joining(",")));
        }
        brokerConfigs.setProperty(KafkaConfig.LogDirProp(), this.logDir.getAbsolutePath());
        return brokerConfigs;
    }

    public synchronized String brokerConnect(String listenerName) {
        EndPoint endpoint = (EndPoint)this.kafka.advertisedListener(listenerName).get();
        return endpoint.host() + ":" + endpoint.port();
    }

    public boolean isKRaft() {
        return this.raftManager.isPresent();
    }

    public synchronized String zkConnect() {
        return this.isKRaft() ? null : this.kafka.config().zkConnect();
    }

    public synchronized String brokerSessionUuid() {
        return this.kafkaConfig.brokerSessionUuid();
    }

    public void shutdownAndCleanup() {
        this.shutdown();
        log.debug("Removing logs.dir at {} ...", (Object)this.logDir);
        List<String> logDirs = Collections.singletonList(this.logDir.getAbsolutePath());
        CoreUtils.delete((Seq)JavaConverters.asScalaBuffer(logDirs));
        try {
            io.confluent.kafka.multitenant.Utils.deleteDir(this.tmpDir);
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        log.debug("Shutdown and cleanup of embedded Kafka broker completed {}.", (Object)this);
    }

    public synchronized void shutdown() {
        if (this.isShutdown) {
            log.debug("Embedded Kafka broker {} was already shut down. Skipping shutdown", (Object)this);
            return;
        }
        if (this.kafka != null) {
            log.debug("Shutting down embedded Kafka broker {} ...", (Object)this);
            this.kafka.shutdown();
            this.kafka.awaitShutdown();
            this.isShutdown = true;
            log.debug("Shutdown of embedded Kafka broker completed {}.", (Object)this);
            this.kafka = null;
        }
    }

    public synchronized void startBroker(Time time) {
        if (this.kafka == null) {
            this.kafka = TestUtils.createServer((KafkaConfig)this.kafkaConfig, (Time)time);
            this.isShutdown = false;
            log.debug("Started embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", (Object)this.logDir, (Object)this.kafkaConfig.zkConnect());
        }
    }

    public void createTopic(ListenerName listenerName, String topic, int partitions, int replication, Properties topicConfig) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{topic, partitions, replication, topicConfig});
        if (this.isKRaft()) {
            TestUtils.resource((AutoCloseable)TestUtils.createAdminClient((Seq)JavaConverters.asScalaBuffer(Collections.singletonList(this.kafka)).toSeq(), (ListenerName)listenerName, (Properties)new Properties()), admin -> TestUtils.createTopicWithAdmin((Admin)admin, (String)topic, (Seq)JavaConverters.asScalaBuffer(Collections.singletonList(this.kafka)).toSeq(), (int)partitions, (int)replication, (Map)JavaConverters.mapAsScalaMap(Collections.emptyMap()), (Properties)topicConfig));
        } else {
            try (KafkaZkClient kafkaZkClient = this.createZkClient();){
                AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
                adminZkClient.createTopic(topic, partitions, replication, topicConfig, (RackAwareMode)RackAwareMode.Enforced$.MODULE$, this.kafkaBroker().config().usesModernTopicId(), this.kafkaBroker().config().usesLegacyTopicId(), Option.empty());
            }
        }
    }

    private KafkaZkClient createZkClient() {
        return KafkaZkClient.apply((String)this.zkConnect(), (boolean)false, (int)10000, (int)8000, (int)Integer.MAX_VALUE, (Time)Time.SYSTEM, (String)"zkclient", (ZKClientConfig)new ZKClientConfig(), (String)"testMetricGroup", (String)"testMetricType", (boolean)true);
    }

    public synchronized KafkaBroker kafkaBroker() {
        return this.kafka;
    }

    public synchronized EndPoint endPoint() {
        return (EndPoint)this.kafka.advertisedListeners().head();
    }

    public synchronized List<String> listeners() {
        return JavaConverters.seqAsJavaList((Seq)this.kafka.config().listeners()).stream().map(e -> e.listenerName().value()).collect(Collectors.toList());
    }

    public synchronized String toString() {
        if (this.kafka == null) {
            return "EmbeddedKafka";
        }
        java.util.Map endpoints = this.listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect));
        return String.format("EmbeddedKafka(brokerId=%d, endpoints=%s, zkConnect=%s)", this.kafka.config().brokerId(), Utils.mkString(endpoints, (String)"", (String)"", (String)":", (String)","), this.zkConnect());
    }

    public static class Builder {
        private final Properties config;
        private final Time time;
        private Optional<KafkaRaftManager<ApiMessageAndVersion>> raftManager = Optional.empty();

        public Builder(Time time) {
            this.config = new Properties();
            this.time = time;
        }

        public Builder addConfigs(Properties props) {
            this.config.putAll((java.util.Map<?, ?>)props);
            return this;
        }

        public Builder setRaftManager(KafkaRaftManager<ApiMessageAndVersion> raftManager) {
            this.raftManager = Optional.of(raftManager);
            return this;
        }

        public EmbeddedKafka build() {
            return new EmbeddedKafka(this.config, this.time, this.raftManager);
        }
    }
}

