package com.playtika.test.kafka.configuration;

import com.github.dockerjava.api.model.Capability;
import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.KafkaTopicsConfigurer;
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import com.playtika.test.kafka.properties.ZookeeperConfigurationProperties;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.MountableFile;

@EnableConfigurationProperties({KafkaConfigurationProperties.class, ZookeeperConfigurationProperties.class})
@Configuration
@ConditionalOnProperty(value = {"embedded.kafka.enabled"}, havingValue = "true", matchIfMissing = true)
/* loaded from: input_file:com/playtika/test/kafka/configuration/KafkaContainerConfiguration.class */
public class KafkaContainerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaContainerConfiguration.class);
    public static final String KAFKA_HOST_NAME = "kafka-broker.testcontainer.docker";

    @ConditionalOnMissingBean({Network.class})
    @Bean(destroyMethod = "close")
    public Network kafkaNetwork() {
        Network newNetwork = Network.newNetwork();
        log.info("Created docker Network id={}", newNetwork.getId());
        return newNetwork;
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties kafkaConfigurationProperties) {
        return new KafkaStatusCheck(kafkaConfigurationProperties);
    }

    /* JADX WARN: Type inference failed for: r0v9, types: [com.playtika.test.kafka.configuration.KafkaContainerConfiguration$1] */
    @Bean(name = {KafkaConfigurationProperties.KAFKA_BEAN_NAME}, destroyMethod = "stop")
    public GenericContainer kafka(KafkaStatusCheck kafkaStatusCheck, KafkaConfigurationProperties kafkaConfigurationProperties, ZookeeperConfigurationProperties zookeeperConfigurationProperties, ConfigurableEnvironment configurableEnvironment, Network network) {
        final int containerBrokerPort = kafkaConfigurationProperties.getContainerBrokerPort();
        final int brokerPort = kafkaConfigurationProperties.getBrokerPort();
        final int saslPlaintextBrokerPort = kafkaConfigurationProperties.getSaslPlaintextBrokerPort();
        String dockerImageVersion = kafkaConfigurationProperties.getDockerImageVersion();
        log.info("Starting kafka broker. Docker image version: {}", dockerImageVersion);
        KafkaContainer kafkaContainer = (KafkaContainer) new KafkaContainer(dockerImageVersion) { // from class: com.playtika.test.kafka.configuration.KafkaContainerConfiguration.1
            public String getBootstrapServers() {
                super.getBootstrapServers();
                return "EXTERNAL_PLAINTEXT://" + getHost() + ":" + getMappedPort(brokerPort) + ",EXTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + getMappedPort(saslPlaintextBrokerPort) + ",INTERNAL_PLAINTEXT://" + KafkaContainerConfiguration.KAFKA_HOST_NAME + ":" + containerBrokerPort;
            }
        }.withCreateContainerCmdModifier(createContainerCmd -> {
            createContainerCmd.withHostName(KAFKA_HOST_NAME);
        }).withCreateContainerCmdModifier(createContainerCmd2 -> {
            createContainerCmd2.getHostConfig().withCapAdd(new Capability[]{Capability.NET_ADMIN});
        }).withEmbeddedZookeeper().withEnv("KAFKA_BROKER_ID", "-1").withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "EXTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT").withEnv("KAFKA_LISTENERS", "EXTERNAL_PLAINTEXT://0.0.0.0:" + brokerPort + ",EXTERNAL_SASL_PLAINTEXT://0.0.0.0:" + saslPlaintextBrokerPort + ",INTERNAL_PLAINTEXT://0.0.0.0:" + containerBrokerPort + ",BROKER://0.0.0.0:9092").withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL_PLAINTEXT").withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1").withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaConfigurationProperties.getReplicationFactor())).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1").withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1").withEnv("KAFKA_LOG_FLUSH_INTERVAL_MS", String.valueOf(kafkaConfigurationProperties.getLogFlushIntervalMs())).withEnv("KAFKA_REPLICA_SOCKET_TIMEOUT_MS", String.valueOf(kafkaConfigurationProperties.getReplicaSocketTimeoutMs())).withEnv("KAFKA_CONTROLLER_SOCKET_TIMEOUT_MS", String.valueOf(kafkaConfigurationProperties.getControllerSocketTimeoutMs())).withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN").withEnv("ZOOKEEPER_SASL_ENABLED", "false").withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf").withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf").withEnv("KAFKA_GC_LOG_OPTS", "-Dnogclog").withExposedPorts(new Integer[]{Integer.valueOf(containerBrokerPort), Integer.valueOf(brokerPort), Integer.valueOf(saslPlaintextBrokerPort), 9093}).withNetwork(network).withNetworkAliases(new String[]{KAFKA_HOST_NAME}).withExtraHost(KAFKA_HOST_NAME, "127.0.0.1").waitingFor(kafkaStatusCheck);
        kafkaFileSystemBind(kafkaConfigurationProperties, kafkaContainer);
        zookeperFileSystemBind(zookeeperConfigurationProperties, kafkaContainer);
        KafkaContainer configureCommonsAndStart = ContainerUtils.configureCommonsAndStart(kafkaContainer, kafkaConfigurationProperties, log);
        registerKafkaEnvironment(configureCommonsAndStart, configurableEnvironment, kafkaConfigurationProperties);
        return configureCommonsAndStart;
    }

    private void kafkaFileSystemBind(KafkaConfigurationProperties kafkaConfigurationProperties, KafkaContainer kafkaContainer) {
        KafkaConfigurationProperties.FileSystemBind fileSystemBind = kafkaConfigurationProperties.getFileSystemBind();
        if (fileSystemBind.isEnabled()) {
            String path = Paths.get(fileSystemBind.getDataFolder(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"))).toAbsolutePath().toString();
            log.info("Writing kafka data to: {}", path);
            kafkaContainer.withFileSystemBind(path, "/var/lib/kafka/data", BindMode.READ_WRITE);
        }
    }

    private void zookeperFileSystemBind(ZookeeperConfigurationProperties zookeeperConfigurationProperties, KafkaContainer kafkaContainer) {
        ZookeeperConfigurationProperties.FileSystemBind fileSystemBind = zookeeperConfigurationProperties.getFileSystemBind();
        if (fileSystemBind.isEnabled()) {
            String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
            String path = Paths.get(fileSystemBind.getDataFolder(), format).toAbsolutePath().toString();
            log.info("Writing zookeeper data to: {}", path);
            String path2 = Paths.get(fileSystemBind.getTxnLogsFolder(), format).toAbsolutePath().toString();
            log.info("Writing zookeeper transaction logs to: {}", path2);
            kafkaContainer.withFileSystemBind(path, "/var/lib/zookeeper/data", BindMode.READ_WRITE).withFileSystemBind(path2, "/var/lib/zookeeper/log", BindMode.READ_WRITE);
        }
    }

    private void registerKafkaEnvironment(GenericContainer genericContainer, ConfigurableEnvironment configurableEnvironment, KafkaConfigurationProperties kafkaConfigurationProperties) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        String containerIpAddress = genericContainer.getContainerIpAddress();
        linkedHashMap.put("embedded.kafka.brokerList", String.format("%s:%d", containerIpAddress, genericContainer.getMappedPort(kafkaConfigurationProperties.getBrokerPort())));
        linkedHashMap.put("embedded.kafka.saslPlaintext.brokerList", String.format("%s:%d", containerIpAddress, genericContainer.getMappedPort(kafkaConfigurationProperties.getSaslPlaintextBrokerPort())));
        linkedHashMap.put("embedded.kafka.saslPlaintext.user", KafkaConfigurationProperties.KAFKA_USER);
        linkedHashMap.put("embedded.kafka.saslPlaintext.password", KafkaConfigurationProperties.KAFKA_PASSWORD);
        linkedHashMap.put("embedded.kafka.containerBrokerList", String.format("%s:%d", KAFKA_HOST_NAME, Integer.valueOf(kafkaConfigurationProperties.getContainerBrokerPort())));
        MapPropertySource mapPropertySource = new MapPropertySource("embeddedKafkaInfo", linkedHashMap);
        log.info("Started kafka broker. Connection details: {}", linkedHashMap);
        configurableEnvironment.getPropertySources().addFirst(mapPropertySource);
    }

    @Bean
    public KafkaTopicsConfigurer kafkaConfigurer(GenericContainer genericContainer, KafkaConfigurationProperties kafkaConfigurationProperties, ZookeeperConfigurationProperties zookeeperConfigurationProperties) {
        return new KafkaTopicsConfigurer(genericContainer, zookeeperConfigurationProperties, kafkaConfigurationProperties);
    }
}
