/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.license;

import com.google.protobuf.Message;
import io.confluent.command.record.Command;
import io.confluent.serializers.ProtoSerde;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LicenseStore {
    private static final Logger log = LoggerFactory.getLogger(LicenseStore.class);
    private static final String KEY_PREFIX = "CONFLUENT_LICENSE";
    private static final Command.CommandKey KEY = Command.CommandKey.newBuilder().setConfigType(Command.CommandConfigType.LICENSE_INFO).setGuid("CONFLUENT_LICENSE").build();
    private static final Duration TOPIC_CREATE_RETRY_BACKOFF = Duration.ofMillis(1000L);
    private final String topic;
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    public static final String MIN_INSYNC_REPLICAS_CONFIG = "min.insync.replicas";
    public static final long READ_TO_END_TIMEOUT_MS = 120000L;
    private final KafkaBasedLog<Command.CommandKey, Command.CommandMessage> licenseLog;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<String> latestLicense;
    private final Duration topicCreateTimeout;
    private final Duration retryBackoffMinMs;
    private final Duration retryBackoffMaxMs;
    private final Time time;

    public LicenseStore(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig) {
        this(topic, producerConfig, consumerConfig, topicConfig, Time.SYSTEM);
    }

    protected LicenseStore(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig, Time time) {
        this(topic, producerConfig, consumerConfig, topicConfig, Duration.ZERO, Duration.ZERO, Duration.ZERO, Time.SYSTEM);
    }

    public LicenseStore(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig, Duration topicCreateTimeout, Duration retryBackoffMinMs, Duration retryBackoffMaxMs, Time time) {
        this.topic = topic;
        this.latestLicense = new AtomicReference();
        this.time = time;
        this.topicCreateTimeout = topicCreateTimeout;
        this.retryBackoffMinMs = retryBackoffMinMs;
        this.retryBackoffMaxMs = retryBackoffMaxMs;
        this.licenseLog = this.setupAndCreateKafkaBasedLog(this.topic, producerConfig, consumerConfig, topicConfig, this.latestLicense, this.time);
    }

    public LicenseStore(String topic, AtomicReference<String> latestLicense, KafkaBasedLog<Command.CommandKey, Command.CommandMessage> licenseLog, Time time) {
        this.topic = topic;
        this.latestLicense = latestLicense;
        this.licenseLog = licenseLog;
        this.time = time;
        this.topicCreateTimeout = Duration.ZERO;
        this.retryBackoffMinMs = Duration.ZERO;
        this.retryBackoffMaxMs = Duration.ZERO;
    }

    KafkaBasedLog<Command.CommandKey, Command.CommandMessage> setupAndCreateKafkaBasedLog(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig, AtomicReference<String> latestLicense, Time time) {
        HashMap<String, Object> producerProps = new HashMap<String, Object>();
        producerProps.putAll(producerConfig);
        producerProps.put("key.serializer", LicenseKeySerde.class.getName());
        producerProps.put("value.serializer", LicenseMessageSerde.class.getName());
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.putAll(consumerConfig);
        consumerProps.put("key.deserializer", LicenseKeySerde.class.getName());
        consumerProps.put("value.deserializer", LicenseMessageSerde.class.getName());
        String replicationFactorString = (String)topicConfig.get(REPLICATION_FACTOR_CONFIG);
        short replicationFactor = replicationFactorString == null ? (short)3 : (short)Short.valueOf(replicationFactorString);
        String minInSyncReplicasString = (String)topicConfig.get(MIN_INSYNC_REPLICAS_CONFIG);
        short minInSyncReplicas = (short)Math.min(minInSyncReplicasString == null ? 2 : (int)Short.valueOf(minInSyncReplicasString).shortValue(), replicationFactor);
        NewTopic topicDescription = TopicAdmin.defineTopic((String)topic).compacted().partitions(1).replicationFactor(replicationFactor).minInSyncReplicas(minInSyncReplicas).build();
        return this.createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(latestLicense), topicDescription, topicConfig, time);
    }

    private KafkaBasedLog<Command.CommandKey, Command.CommandMessage> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> consumedCallback, NewTopic topicDescription, Map<String, Object> topicConfig, Time time) {
        Runnable createTopics = () -> {
            try (AdminClient admin = AdminClient.create((Map)topicConfig);){
                this.createConfluentLicenseTopic(admin, topic, topicDescription);
            }
        };
        return new KafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
    }

    private KafkaException toKafkaException(Throwable t) {
        return t instanceof KafkaException ? (KafkaException)t : new KafkaException(t);
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            log.info("Starting License Store");
            this.startLog();
            log.info("Started License Store");
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            log.info("Closing License Store");
            this.stopLog();
            log.info("Closed License Store");
        }
    }

    protected void startLog() {
        this.licenseLog.start();
    }

    protected void stopLog() {
        this.licenseLog.stop();
    }

    public String licenseScan() {
        try {
            this.licenseLog.readToEnd().get(120000L, TimeUnit.MILLISECONDS);
            return this.latestLicense.get();
        }
        catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
            log.error("Failed to read license from Kafka: ", (Throwable)e);
            throw new IllegalStateException(e);
        }
    }

    public void registerLicense(String license) {
        this.registerLicense(license, null);
    }

    public void registerLicense(String license, org.apache.kafka.clients.producer.Callback callback) {
        Command.CommandMessage licenseMsg = Command.CommandMessage.newBuilder().setLicenseInfo(Command.LicenseInfo.newBuilder().setJwt(license).build()).build();
        this.licenseLog.send((Object)KEY, (Object)licenseMsg, callback);
    }

    protected void createConfluentLicenseTopic(AdminClient admin, String topic, NewTopic topicDescription) {
        long endTimeMs = this.time.milliseconds() + this.topicCreateTimeout.toMillis();
        try {
            admin.describeTopics(Collections.singleton(topic)).all().get();
            log.debug("Topic {} already exists.", (Object)topic);
            return;
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof UnknownTopicOrPartitionException) {
                log.debug("Topic {} does not exist, will attempt to create", (Object)topic);
            }
            if (cause instanceof RetriableException) {
                log.debug("Topic could not be described, will attempt to create", (Throwable)e);
            }
            throw this.toKafkaException(cause);
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
        Throwable lastException = null;
        int numCreateTopicRetries = 0;
        while (true) {
            try {
                admin.createTopics(Collections.singleton(topicDescription)).all().get();
                log.debug("License topic {} created", (Object)topic);
                break;
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (lastException == null) {
                    if (cause instanceof InvalidReplicationFactorException) {
                        log.info("Creating topic {} with replication factor {}. At least {} brokers must be started concurrently to complete license registration.", new Object[]{topic, topicDescription.replicationFactor(), topicDescription.replicationFactor()});
                    } else if (cause instanceof UnsupportedVersionException) {
                        log.info("Topic {} could not be created due to UnsupportedVersionException. This may be indicate that a rolling upgrade from an older version is in progress. The request will be retried.", (Object)topic);
                    }
                }
                lastException = cause;
                if (cause instanceof TopicExistsException) {
                    log.debug("License topic {} was created by different node", (Object)topic);
                    break;
                }
                if (!(cause instanceof RetriableException || cause instanceof InvalidReplicationFactorException || cause instanceof UnsupportedVersionException)) {
                    throw this.toKafkaException(cause);
                }
            }
            catch (InterruptedException e) {
                throw new InterruptException(e);
            }
            long remainingMs = endTimeMs - this.time.milliseconds();
            if (remainingMs <= 0L) {
                throw new TimeoutException("License topic could not be created", lastException);
            }
            long backoffMs = Math.min(this.retryBackoffMaxMs.toMillis(), Math.max(this.retryBackoffMinMs.toMillis(), (long)Math.pow(2.0, numCreateTopicRetries) * TOPIC_CREATE_RETRY_BACKOFF.toMillis()));
            ++numCreateTopicRetries;
            long sleepMs = Math.min(remainingMs, backoffMs);
            log.debug("Topic could not be created, retrying after {}ms: {}", (Object)sleepMs, (Object)lastException.getMessage());
            this.time.sleep(sleepMs);
        }
    }

    public static class ConsumeCallback
    implements Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> {
        private final AtomicReference<String> latestLicenseRef;

        ConsumeCallback(AtomicReference<String> latestLicenseRef) {
            this.latestLicenseRef = latestLicenseRef;
        }

        public void onCompletion(Throwable error, ConsumerRecord<Command.CommandKey, Command.CommandMessage> record) {
            if (error != null) {
                log.error("Unexpected error in consumer callback for LicenseStore: ", error);
                return;
            }
            if (((Command.CommandKey)record.key()).getConfigType() == Command.CommandConfigType.LICENSE_INFO) {
                this.latestLicenseRef.set(((Command.CommandMessage)record.value()).getLicenseInfo().getJwt());
            }
        }
    }

    public static class LicenseMessageSerde
    extends ProtoSerde<Command.CommandMessage> {
        public LicenseMessageSerde() {
            super((Message)Command.CommandMessage.getDefaultInstance());
        }
    }

    public static class LicenseKeySerde
    extends ProtoSerde<Command.CommandKey> {
        public LicenseKeySerde() {
            super((Message)Command.CommandKey.getDefaultInstance());
        }
    }
}

