package io.confluent.kafka.test.cluster;

import io.confluent.kafka.link.integration.MultiTenantClusterLinkTest;
import io.confluent.kafka.multitenant.Utils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.admin.RackAwareMode$Enforced$;
import kafka.cluster.EndPoint;
import kafka.raft.KafkaRaftManager;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.server.metadata.BrokerServerMetrics$;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/kafka/test/cluster/EmbeddedKafka.class */
public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final File logDir;
    private final Properties effectiveConfig;
    private final KafkaConfig kafkaConfig;
    private final Optional<KafkaRaftManager<ApiMessageAndVersion>> raftManager;
    private Path tmpDir;
    private volatile KafkaBroker kafka;
    private boolean isShutdown;
    private MockFaultHandler faultHandler;

    /* loaded from: input_file:io/confluent/kafka/test/cluster/EmbeddedKafka$Builder.class */
    public static class Builder {
        private final Time time;
        private Optional<KafkaRaftManager<ApiMessageAndVersion>> raftManager = Optional.empty();
        private final Properties config = new Properties();

        public Builder(Time time) {
            this.time = time;
        }

        public Builder addConfigs(Properties properties) {
            this.config.putAll(properties);
            return this;
        }

        public Builder setRaftManager(KafkaRaftManager<ApiMessageAndVersion> kafkaRaftManager) {
            this.raftManager = Optional.of(kafkaRaftManager);
            return this;
        }

        public EmbeddedKafka build() {
            return new EmbeddedKafka(this.config, this.time, this.raftManager);
        }
    }

    private EmbeddedKafka(Properties properties, Time time, Optional<KafkaRaftManager<ApiMessageAndVersion>> optional) {
        this.faultHandler = new MockFaultHandler(MultiTenantClusterLinkTest.SSL_KAFKA_CN);
        try {
            this.raftManager = optional;
            this.tmpDir = TestUtils.tempDir().toPath();
            this.logDir = Files.createTempDirectory(this.tmpDir, null, new FileAttribute[0]).toFile();
            this.effectiveConfig = brokerConfigs(properties);
            this.kafkaConfig = new KafkaConfig(this.effectiveConfig, true);
            if (optional.isPresent()) {
                log.debug("Starting embedded KRaft Kafka broker %d (with log.dirs={}).", Integer.valueOf(this.kafkaConfig.nodeId()), this.logDir);
                String clusterId = optional.get().metaProperties().clusterId();
                Metrics initializeMetrics = Server.initializeMetrics(this.kafkaConfig, time, clusterId);
                this.kafka = new BrokerServer(this.kafkaConfig, new MetaProperties(clusterId, this.kafkaConfig.nodeId()), optional.get(), Option.empty(), Time.SYSTEM, initializeMetrics, BrokerServerMetrics$.MODULE$.apply(initializeMetrics), Option.apply(String.format("Broker%02d_", Integer.valueOf(this.kafkaConfig.nodeId()))), JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq(), optional.get().controllerQuorumVotersFuture(), this.faultHandler, this.faultHandler, this.faultHandler, Option.apply(ConfluentConfigs.buildMultitenantMetadata(this.kafkaConfig.values(), initializeMetrics)));
            } else {
                log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, this.kafkaConfig.zkConnect());
                this.kafka = TestUtils.createServer(this.kafkaConfig, time);
                log.debug("Startup of embedded KafkaServer completed: {}", this);
            }
            this.isShutdown = false;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Properties brokerConfigs(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig.BrokerIdProp(), 0);
        properties2.put(KafkaConfig.ListenersProp(), "PLAINTEXT://127.0.0.1:9092");
        properties2.put(KafkaConfig.NumPartitionsProp(), 1);
        properties2.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
        properties2.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
        properties2.put(KafkaConfig.ControlledShutdownEnableProp(), true);
        properties2.put("confluent.license.topic.replication.factor", "1");
        if (isKRaft()) {
            properties2.put(KafkaConfig.ProcessRolesProp(), "broker");
            properties2.put(KafkaConfig.QuorumVotersProp(), "1000@localhost:0");
            properties2.put(KafkaConfig.ClusterLinkEnableProp(), "false");
            properties2.put(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER");
        }
        properties2.putAll(properties);
        if (isKRaft()) {
            String property = properties2.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "");
            ArrayList arrayList = new ArrayList();
            for (String str : property.split(",")) {
                if (!str.trim().isEmpty()) {
                    arrayList.add(str);
                }
            }
            TreeSet<String> treeSet = new TreeSet();
            for (String str2 : properties2.getProperty(KafkaConfig.ListenersProp(), "").split(",")) {
                int indexOf = str2.indexOf(58);
                if (indexOf < 0) {
                    treeSet.add(str2);
                } else {
                    treeSet.add(str2.substring(0, indexOf));
                }
            }
            treeSet.add("CONTROLLER");
            for (String str3 : treeSet) {
                if (!arrayList.stream().anyMatch(str4 -> {
                    return str4.startsWith(str3);
                })) {
                    arrayList.add(str3 + ":PLAINTEXT");
                }
            }
            properties2.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), (String) arrayList.stream().collect(Collectors.joining(",")));
        }
        properties2.setProperty(KafkaConfig.LogDirProp(), this.logDir.getAbsolutePath());
        return properties2;
    }

    public synchronized String brokerConnect(String str) {
        EndPoint endPoint = (EndPoint) this.kafka.advertisedListener(str).get();
        return endPoint.host() + ":" + endPoint.port();
    }

    public boolean isKRaft() {
        return this.raftManager.isPresent();
    }

    public synchronized String zkConnect() {
        if (isKRaft()) {
            return null;
        }
        return this.kafka.config().zkConnect();
    }

    public synchronized String brokerSessionUuid() {
        return this.kafkaConfig.brokerSessionUuid();
    }

    public void shutdownAndCleanup() {
        shutdown();
        log.debug("Removing logs.dir at {} ...", this.logDir);
        CoreUtils.delete(JavaConverters.asScalaBuffer(Collections.singletonList(this.logDir.getAbsolutePath())));
        try {
            Utils.deleteDir(this.tmpDir);
            log.debug("Shutdown and cleanup of embedded Kafka broker completed {}.", this);
            this.faultHandler.maybeRethrowFirstException();
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public synchronized void shutdown() {
        if (this.isShutdown) {
            log.debug("Embedded Kafka broker {} was already shut down. Skipping shutdown", this);
            return;
        }
        if (this.kafka != null) {
            log.debug("Shutting down embedded Kafka broker {} ...", this);
            this.kafka.shutdown();
            this.kafka.awaitShutdown();
            this.isShutdown = true;
            log.debug("Shutdown of embedded Kafka broker completed {}.", this);
            this.kafka = null;
        }
    }

    public synchronized void startBroker(Time time) {
        if (this.kafka == null) {
            this.kafka = TestUtils.createServer(this.kafkaConfig, time);
            this.isShutdown = false;
            log.debug("Started embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, this.kafkaConfig.zkConnect());
        }
    }

    public synchronized void startKafkaBroker() {
        this.kafka.startup();
        log.debug("Startup of embedded BrokerServer completed: {}", this);
    }

    public void createTopic(ListenerName listenerName, String str, int i, int i2, Properties properties) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), properties});
        if (isKRaft()) {
            TestUtils.resource(TestUtils.createAdminClient(JavaConverters.asScalaBuffer(Collections.singletonList(this.kafka)).toSeq(), listenerName, new Properties()), admin -> {
                return TestUtils.createTopicWithAdmin(admin, str, JavaConverters.asScalaBuffer(Collections.singletonList(this.kafka)).toSeq(), i, i2, JavaConverters.mapAsScalaMap(Collections.emptyMap()), properties);
            });
            return;
        }
        KafkaZkClient createZkClient = createZkClient();
        Throwable th = null;
        try {
            try {
                new AdminZkClient(createZkClient).createTopic(str, i, i2, properties, RackAwareMode$Enforced$.MODULE$, kafkaBroker().config().usesModernTopicId(), kafkaBroker().config().usesLegacyTopicId(), Option.empty());
                if (createZkClient != null) {
                    if (0 == 0) {
                        createZkClient.close();
                        return;
                    }
                    try {
                        createZkClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createZkClient != null) {
                if (th != null) {
                    try {
                        createZkClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createZkClient.close();
                }
            }
            throw th4;
        }
    }

    private KafkaZkClient createZkClient() {
        return KafkaZkClient.apply(zkConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "zkclient", new ZKClientConfig(), "testMetricGroup", "testMetricType", true);
    }

    public KafkaBroker kafkaBroker() {
        return this.kafka;
    }

    public synchronized EndPoint endPoint() {
        return (EndPoint) this.kafka.advertisedListeners().head();
    }

    public synchronized List<String> listeners() {
        return (List) JavaConverters.seqAsJavaList(this.kafka.config().listeners()).stream().map(endPoint -> {
            return endPoint.listenerName().value();
        }).collect(Collectors.toList());
    }

    public synchronized String toString() {
        return this.kafka == null ? "EmbeddedKafka" : String.format("EmbeddedKafka(brokerId=%d, endpoints=%s, zkConnect=%s)", Integer.valueOf(this.kafka.config().brokerId()), org.apache.kafka.common.utils.Utils.mkString((Map) listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect)), "", "", ":", ","), zkConnect());
    }
}
