package com.playtika.test.kafka;

import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import com.playtika.test.kafka.properties.ZookeeperConfigurationProperties;
import java.util.Collection;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;

/* loaded from: input_file:com/playtika/test/kafka/KafkaTopicsConfigurer.class */
public class KafkaTopicsConfigurer {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicsConfigurer.class);
    private final GenericContainer kafka;
    private final ZookeeperConfigurationProperties zookeeperProperties;
    private final KafkaConfigurationProperties kafkaProperties;

    @PostConstruct
    void configure() {
        createTopics(this.kafkaProperties.getTopicsToCreate());
        restrictTopics(KafkaConfigurationProperties.KAFKA_USER, this.kafkaProperties.getSecureTopics());
    }

    public void createTopics(Collection<String> collection) {
        if (collection.isEmpty()) {
            return;
        }
        log.info("Creating Kafka topics: {}", collection);
        collection.parallelStream().forEach(this::createTopic);
        log.info("Created Kafka topics: {}", collection);
    }

    private void createTopic(String str) {
        String[] createTopicCmd = getCreateTopicCmd(str, this.zookeeperProperties.getZookeeperConnect());
        log.debug("Topic={} creation cmd='{}' execResult={}", new Object[]{str, createTopicCmd, ContainerUtils.executeInContainer(this.kafka, createTopicCmd)});
    }

    private void restrictTopics(String str, Collection<String> collection) {
        if (collection.isEmpty()) {
            return;
        }
        log.info("Creating ACLs for Kafka topics: {}", collection);
        for (String str2 : collection) {
            String[] topicConsumerACLCmd = getTopicConsumerACLCmd(str, str2, this.zookeeperProperties.getZookeeperConnect());
            String[] topicProducerACLCmd = getTopicProducerACLCmd(str, str2, this.zookeeperProperties.getZookeeperConnect());
            log.debug("Topic={} consumer ACLs cmd='{}' execResult={}, producer ACLs cmd='{}' execResult={}", new Object[]{str2, topicConsumerACLCmd, ContainerUtils.executeInContainer(this.kafka, topicConsumerACLCmd), topicProducerACLCmd, Integer.valueOf(ContainerUtils.executeInContainer(this.kafka, topicProducerACLCmd).getExitCode())});
        }
        log.info("Created ACLs for Kafka topics: {}", collection);
    }

    private String[] getCreateTopicCmd(String str, String str2) {
        return new String[]{"kafka-topics", "--create", "--topic", str, "--partitions", "1", "--replication-factor", "1", "--if-not-exists", "--zookeeper", str2};
    }

    private String[] getTopicConsumerACLCmd(String str, String str2, String str3) {
        return new String[]{"kafka-acls", "--authorizer-properties", "zookeeper.connect=" + str3, "--add", "--allow-principal", "User:" + str, "--consumer", "--topic", str2, "--group", "*"};
    }

    private String[] getTopicProducerACLCmd(String str, String str2, String str3) {
        return new String[]{"kafka-acls", "--authorizer-properties", "zookeeper.connect=" + str3, "--add", "--allow-principal", "User:" + str, "--producer", "--topic", str2};
    }

    public KafkaTopicsConfigurer(GenericContainer genericContainer, ZookeeperConfigurationProperties zookeeperConfigurationProperties, KafkaConfigurationProperties kafkaConfigurationProperties) {
        this.kafka = genericContainer;
        this.zookeeperProperties = zookeeperConfigurationProperties;
        this.kafkaProperties = kafkaConfigurationProperties;
    }

    public GenericContainer getKafka() {
        return this.kafka;
    }

    public ZookeeperConfigurationProperties getZookeeperProperties() {
        return this.zookeeperProperties;
    }

    public KafkaConfigurationProperties getKafkaProperties() {
        return this.kafkaProperties;
    }
}
