package io.confluent.kafka.test.cluster;

import io.confluent.common.EndPoint;
import io.confluent.kafka.multitenant.integration.test.FileBasedPlainSaslAuthHostNameValidationIntegrationTest;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.admin.RackAwareMode$Enforced$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/kafka/test/cluster/EmbeddedKafka.class */
public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final File logDir;
    private final Properties effectiveConfig;
    public final TemporaryFolder tmpFolder;
    private final KafkaConfig kafkaConfig;
    private KafkaServer kafka;
    private boolean isShutdown;

    /* loaded from: input_file:io/confluent/kafka/test/cluster/EmbeddedKafka$Builder.class */
    public static class Builder {
        private final Properties config = new Properties();
        private final Time time;

        public Builder(Time time) {
            this.time = time;
        }

        public Builder addConfigs(Properties properties) {
            this.config.putAll(properties);
            return this;
        }

        public EmbeddedKafka build() {
            return new EmbeddedKafka(this.config, this.time);
        }
    }

    private EmbeddedKafka(Properties properties, Time time) {
        this.tmpFolder = new TemporaryFolder();
        try {
            this.tmpFolder.create();
            this.logDir = this.tmpFolder.newFolder();
            this.effectiveConfig = brokerConfigs(properties);
            this.kafkaConfig = new KafkaConfig(this.effectiveConfig, true);
            log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, this.kafkaConfig.zkConnect());
            this.kafka = TestUtils.createServer(this.kafkaConfig, time);
            this.isShutdown = false;
            log.debug("Startup of embedded Kafka broker completed: {}", this);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Properties brokerConfigs(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig.BrokerIdProp(), 0);
        properties2.put(KafkaConfig.HostNameProp(), FileBasedPlainSaslAuthHostNameValidationIntegrationTest.LOCAL_HOST_IP);
        properties2.put(KafkaConfig.PortProp(), "9092");
        properties2.put(KafkaConfig.NumPartitionsProp(), 1);
        properties2.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
        properties2.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
        properties2.put(KafkaConfig.ControlledShutdownEnableProp(), true);
        properties2.putAll(properties);
        properties2.setProperty(KafkaConfig.LogDirProp(), this.logDir.getAbsolutePath());
        return properties2;
    }

    public synchronized String brokerConnect(String str) {
        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(new ListenerName(str));
    }

    public synchronized String zkConnect() {
        return this.kafka.config().zkConnect();
    }

    public void shutdownAndCleanup() {
        shutdown();
        log.debug("Removing logs.dir at {} ...", this.logDir);
        CoreUtils.delete(JavaConverters.asScalaBuffer(Collections.singletonList(this.logDir.getAbsolutePath())));
        this.tmpFolder.delete();
        log.debug("Shutdown and cleanup of embedded Kafka broker completed {}.", this);
    }

    public synchronized void shutdown() {
        if (this.isShutdown) {
            log.debug("Embedded Kafka broker {} was already shut down. Skipping shutdown", this);
            return;
        }
        log.debug("Shutting down embedded Kafka broker {} ...", this);
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        this.isShutdown = true;
        log.debug("Shutdown of embedded Kafka broker completed {}.", this);
        this.kafka = null;
    }

    public synchronized void startBroker(Time time) {
        if (this.kafka == null) {
            this.kafka = TestUtils.createServer(this.kafkaConfig, time);
            this.isShutdown = false;
            log.debug("Started embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, this.kafkaConfig.zkConnect());
        }
    }

    public void createTopic(String str, int i, int i2, Properties properties) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), properties});
        KafkaZkClient createZkClient = createZkClient();
        Throwable th = null;
        try {
            try {
                new AdminZkClient(createZkClient).createTopic(str, i, i2, properties, RackAwareMode$Enforced$.MODULE$, kafkaServer().config().usesModernTopicId(), kafkaServer().config().usesLegacyTopicId(), Option.empty());
                if (createZkClient != null) {
                    if (0 == 0) {
                        createZkClient.close();
                        return;
                    }
                    try {
                        createZkClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createZkClient != null) {
                if (th != null) {
                    try {
                        createZkClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createZkClient.close();
                }
            }
            throw th4;
        }
    }

    private KafkaZkClient createZkClient() {
        return KafkaZkClient.apply(zkConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup", "testMetricType", Option.empty(), Option.empty());
    }

    public synchronized KafkaServer kafkaServer() {
        return this.kafka;
    }

    public synchronized EndPoint endPoint() {
        Object obj = this.effectiveConfig.get(KafkaConfig.InterBrokerListenerNameProp());
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        if (obj != null) {
            securityProtocol = SecurityProtocol.forName(obj.toString());
        }
        return new EndPoint(this.kafka.config().hostName(), this.kafka.boundPort(ListenerName.forSecurityProtocol(securityProtocol)), securityProtocol);
    }

    public synchronized List<String> listeners() {
        return (List) JavaConverters.seqAsJavaList(this.kafka.config().listeners()).stream().map(endPoint -> {
            return endPoint.listenerName().value();
        }).collect(Collectors.toList());
    }

    public synchronized String toString() {
        return String.format("Kafka brokerId=%d, endpoints=%s, zkConnect=%s", Integer.valueOf(this.kafka.config().brokerId()), Utils.mkString((Map) listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect)), "", "", ":", ","), zkConnect());
    }
}
