package io.confluent.ksql.rest.server.computation;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.rest.server.computation.ConfigTopicKey;
import io.confluent.ksql.util.KsqlConfig;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/computation/KafkaConfigStore.class */
public class KafkaConfigStore implements ConfigStore {
    private static final Logger log = LogManager.getLogger(KafkaConfigStore.class);
    public static final String CONFIG_MSG_KEY = "ksql-standalone-configs";
    private final KsqlConfig ksqlConfig;

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/KafkaConfigStore$KafkaWriteOnceStore.class */
    private static class KafkaWriteOnceStore<V> {
        private final String topicName;
        private final ConfigTopicKey.StringKey key;
        private final Deserializer<ConfigTopicKey> keyDeserializer;
        private final Deserializer<V> deserializer;
        private final Supplier<KafkaConsumer<byte[], byte[]>> consumerSupplier;
        private final Supplier<KafkaProducer<ConfigTopicKey.StringKey, V>> producerSupplier;

        KafkaWriteOnceStore(String str, ConfigTopicKey.StringKey stringKey, Deserializer<ConfigTopicKey> deserializer, Deserializer<V> deserializer2, Supplier<KafkaConsumer<byte[], byte[]>> supplier, Supplier<KafkaProducer<ConfigTopicKey.StringKey, V>> supplier2) {
            this.topicName = str;
            this.key = stringKey;
            this.keyDeserializer = deserializer;
            this.deserializer = deserializer2;
            this.consumerSupplier = supplier;
            this.producerSupplier = supplier2;
        }

        private boolean matchKey(ConsumerRecord<byte[], byte[]> consumerRecord) {
            try {
                return this.key.equals((ConfigTopicKey) this.keyDeserializer.deserialize(this.topicName, (byte[]) consumerRecord.key()));
            } catch (SerializationException e) {
                return false;
            }
        }

        private Optional<V> read() {
            TopicPartition topicPartition = new TopicPartition(this.topicName, 0);
            List singletonList = Collections.singletonList(topicPartition);
            KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumerSupplier.get();
            try {
                kafkaConsumer.assign(singletonList);
                kafkaConsumer.seekToBeginning(singletonList);
                long longValue = ((Long) kafkaConsumer.endOffsets(singletonList).get(topicPartition)).longValue();
                while (kafkaConsumer.position(topicPartition) < longValue) {
                    KafkaConfigStore.log.debug("Reading from topic {}. Position({}) End({})", this.topicName, Long.valueOf(kafkaConsumer.position(topicPartition)), Long.valueOf(longValue));
                    Optional findFirst = kafkaConsumer.poll(Duration.of(5L, ChronoUnit.SECONDS)).records(topicPartition).stream().filter(this::matchKey).findFirst();
                    if (findFirst.isPresent()) {
                        byte[] bArr = (byte[]) ((ConsumerRecord) findFirst.get()).value();
                        KafkaConfigStore.log.debug("Found existing value in topic {}", this.topicName);
                        Optional<V> of = Optional.of(this.deserializer.deserialize(this.topicName, bArr));
                        if (kafkaConsumer != null) {
                            kafkaConsumer.close();
                        }
                        return of;
                    }
                }
                KafkaConfigStore.log.debug("No value found on topic {}", this.topicName);
                Optional<V> empty = Optional.empty();
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                return empty;
            } catch (Throwable th) {
                if (kafkaConsumer != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        private void write(String str, V v) {
            KafkaProducer<ConfigTopicKey.StringKey, V> kafkaProducer = this.producerSupplier.get();
            try {
                kafkaProducer.send(new ProducerRecord(str, this.key, v));
                kafkaProducer.flush();
                if (kafkaProducer != null) {
                    kafkaProducer.close();
                }
            } catch (Throwable th) {
                if (kafkaProducer != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        V readMaybeWrite(V v) {
            Optional<V> read = read();
            if (read.isPresent()) {
                return read.get();
            }
            KafkaConfigStore.log.debug("Writing current config to config topic");
            write(this.topicName, v);
            return read().get();
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/KafkaConfigStore$KsqlProperties.class */
    public static class KsqlProperties {
        private final Map<String, String> ksqlProperties;

        @JsonCreator
        KsqlProperties(@JsonProperty("ksqlProperties") Optional<Map<String, String>> optional) {
            this.ksqlProperties = optional.isPresent() ? (Map) optional.get().entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).collect(ImmutableMap.toImmutableMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })) : Collections.emptyMap();
        }

        @JsonInclude(content = JsonInclude.Include.ALWAYS)
        @SuppressFBWarnings({"EI_EXPOSE_REP"})
        public Map<String, String> getKsqlProperties() {
            return this.ksqlProperties;
        }

        static KsqlProperties createFor(KsqlConfig ksqlConfig) {
            return new KsqlProperties(Optional.of(ksqlConfig.getAllConfigPropsWithSecretsObfuscated()));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.ksqlProperties, ((KsqlProperties) obj).ksqlProperties);
        }

        public int hashCode() {
            return Objects.hash(this.ksqlProperties);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KafkaConsumer<byte[], byte[]> createConsumer(KsqlConfig ksqlConfig) {
        return new KafkaConsumer<>(ksqlConfig.getKsqlStreamConfigProps(), Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KafkaProducer<ConfigTopicKey.StringKey, KsqlProperties> createProducer(KsqlConfig ksqlConfig) {
        return new KafkaProducer<>(ksqlConfig.getKsqlStreamConfigProps(), InternalTopicSerdes.serializer(), InternalTopicSerdes.serializer());
    }

    public KafkaConfigStore(String str, KsqlConfig ksqlConfig) {
        this(str, ksqlConfig, () -> {
            return createConsumer(ksqlConfig);
        }, () -> {
            return createProducer(ksqlConfig);
        });
    }

    KafkaConfigStore(String str, KsqlConfig ksqlConfig, Supplier<KafkaConsumer<byte[], byte[]>> supplier, Supplier<KafkaProducer<ConfigTopicKey.StringKey, KsqlProperties>> supplier2) {
        this.ksqlConfig = ksqlConfig.overrideBreakingConfigsWithOriginalValues(((KsqlProperties) new KafkaWriteOnceStore(str, new ConfigTopicKey.StringKey(CONFIG_MSG_KEY), InternalTopicSerdes.deserializer(ConfigTopicKey.class), InternalTopicSerdes.deserializer(KsqlProperties.class), supplier, supplier2).readMaybeWrite(KsqlProperties.createFor(ksqlConfig))).getKsqlProperties());
    }

    @Override // io.confluent.ksql.rest.server.computation.ConfigStore
    @SuppressFBWarnings({"EI_EXPOSE_REP"})
    public KsqlConfig getKsqlConfig() {
        return this.ksqlConfig;
    }
}
