package com.consol.citrus.kafka.embedded;

import com.consol.citrus.common.InitializingPhase;
import com.consol.citrus.common.ShutdownPhase;
import com.consol.citrus.exceptions.CitrusRuntimeException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerState;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.SocketUtils;
import org.springframework.util.StringUtils;
import scala.Option;

/* loaded from: input_file:com/consol/citrus/kafka/embedded/EmbeddedKafkaServer.class */
public class EmbeddedKafkaServer implements InitializingPhase, ShutdownPhase {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
    private ZooKeeperServer zookeeper;
    private ServerCnxnFactory serverFactory;
    private KafkaServer kafkaServer;
    private String logDirPath;
    private int zookeeperPort = SocketUtils.findAvailableTcpPort();
    private int kafkaServerPort = SocketUtils.findAvailableTcpPort(9092);
    private int partitions = 1;
    private String topics = "citrus";
    private boolean autoDeleteLogs = true;
    private Map<String, String> brokerProperties = Collections.emptyMap();

    public void start() {
        if (this.kafkaServer != null) {
            LOG.debug("Found instance of Kafka server - avoid duplicate Kafka server startup");
            return;
        }
        File createLogDir = createLogDir();
        this.zookeeper = createZookeeperServer(createLogDir);
        this.serverFactory = createServerFactory();
        try {
            this.serverFactory.startup(this.zookeeper);
            Properties createBrokerProperties = createBrokerProperties("localhost:" + this.zookeeperPort, this.kafkaServerPort, createLogDir);
            createBrokerProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
            createBrokerProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
            createBrokerProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
            createBrokerProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
            if (this.brokerProperties != null) {
                Map<String, String> map = this.brokerProperties;
                Objects.requireNonNull(createBrokerProperties);
                map.forEach((v1, v2) -> {
                    r1.put(v1, v2);
                });
            }
            this.kafkaServer = new KafkaServer(new KafkaConfig(createBrokerProperties), Time.SYSTEM, Option.apply((Object) null), false);
            this.kafkaServer.startup();
            this.kafkaServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
            createKafkaTopics(StringUtils.commaDelimitedListToSet(this.topics));
        } catch (IOException | InterruptedException e) {
            throw new CitrusRuntimeException("Failed to start embedded zookeeper server", e);
        }
    }

    public void stop() {
        if (this.kafkaServer != null) {
            try {
                if (this.kafkaServer.brokerState() != BrokerState.NOT_RUNNING) {
                    this.kafkaServer.shutdown();
                    this.kafkaServer.awaitShutdown();
                }
            } catch (Exception e) {
                LOG.warn("Failed to shutdown Kafka embedded server", e);
            }
            try {
                CoreUtils.delete(this.kafkaServer.config().logDirs());
            } catch (Exception e2) {
                LOG.warn("Failed to remove logs on Kafka embedded server", e2);
            }
        }
        if (this.serverFactory != null) {
            try {
                this.serverFactory.shutdown();
            } catch (Exception e3) {
                LOG.warn("Failed to shutdown Zookeeper instance", e3);
            }
        }
    }

    public void destroy() {
        stop();
    }

    public void initialize() {
        start();
    }

    protected ZooKeeperServer createZookeeperServer(File file) {
        try {
            return new ZooKeeperServer(file, file, 2000);
        } catch (IOException e) {
            throw new CitrusRuntimeException("Failed to create embedded zookeeper server", e);
        }
    }

    protected File createLogDir() {
        File file = (File) Optional.ofNullable(this.logDirPath).map(str -> {
            return Paths.get(str, new String[0]);
        }).map((v0) -> {
            return v0.toFile();
        }).orElse(new File(System.getProperty("java.io.tmpdir")));
        if (!file.exists() && !file.mkdirs()) {
            LOG.warn("Unable to create log directory: " + file.getAbsolutePath());
            file = new File(System.getProperty("java.io.tmpdir"));
            LOG.info("Using default log directory: " + file.getAbsolutePath());
        }
        File absoluteFile = new File(file, "zookeeper" + System.currentTimeMillis()).getAbsoluteFile();
        if (this.autoDeleteLogs) {
            absoluteFile.deleteOnExit();
        }
        return absoluteFile;
    }

    protected ServerCnxnFactory createServerFactory() {
        try {
            NIOServerCnxnFactory nIOServerCnxnFactory = new NIOServerCnxnFactory();
            nIOServerCnxnFactory.configure(new InetSocketAddress(this.zookeeperPort), 5000);
            return nIOServerCnxnFactory;
        } catch (IOException e) {
            throw new CitrusRuntimeException("Failed to create default zookeeper server factory", e);
        }
    }

    protected void createKafkaTopics(Set<String> set) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:" + this.kafkaServerPort);
        AdminClient create = AdminClient.create(hashMap);
        try {
            try {
                create.createTopics((List) set.stream().map(str -> {
                    return new NewTopic(str, this.partitions, (short) 1);
                }).collect(Collectors.toList())).all().get();
            } catch (Exception e) {
                LOG.warn("Failed to create Kafka topics", e);
            }
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected Properties createBrokerProperties(String str, int i, File file) {
        Properties properties = new Properties();
        properties.put(KafkaConfig.BrokerIdProp(), "0");
        properties.put(KafkaConfig.ZkConnectProp(), str);
        properties.put(KafkaConfig.ZkConnectionTimeoutMsProp(), "10000");
        properties.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1500");
        properties.put(KafkaConfig.ControllerSocketTimeoutMsProp(), "1500");
        properties.put(KafkaConfig.ControlledShutdownEnableProp(), "false");
        properties.put(KafkaConfig.DeleteTopicEnableProp(), "true");
        properties.put(KafkaConfig.LogDeleteDelayMsProp(), "1000");
        properties.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "100");
        properties.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152");
        properties.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp(), Long.MAX_VALUE);
        properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
        properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), "5");
        properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
        properties.put(KafkaConfig.LogDirProp(), file.getAbsolutePath());
        properties.put(KafkaConfig.ListenersProp(), SecurityProtocol.PLAINTEXT.name + "://localhost:" + i);
        if (LOG.isDebugEnabled()) {
            properties.forEach((obj, obj2) -> {
                LOG.debug(String.format("Using default Kafka broker property %s='%s'", obj, obj2));
            });
        }
        return properties;
    }

    public int getZookeeperPort() {
        return this.zookeeperPort;
    }

    public void setZookeeperPort(int i) {
        this.zookeeperPort = i;
    }

    public int getKafkaServerPort() {
        return this.kafkaServerPort;
    }

    public void setKafkaServerPort(int i) {
        this.kafkaServerPort = i;
    }

    public int getPartitions() {
        return this.partitions;
    }

    public void setPartitions(int i) {
        this.partitions = i;
    }

    public String getTopics() {
        return this.topics;
    }

    public void setTopics(String str) {
        this.topics = str;
    }

    public Map<String, String> getBrokerProperties() {
        return this.brokerProperties;
    }

    public void setBrokerProperties(Map<String, String> map) {
        this.brokerProperties = map;
    }

    public String getLogDirPath() {
        return this.logDirPath;
    }

    public void setLogDirPath(String str) {
        this.logDirPath = str;
    }

    public boolean isAutoDeleteLogs() {
        return this.autoDeleteLogs;
    }

    public void setAutoDeleteLogs(boolean z) {
        this.autoDeleteLogs = z;
    }
}
