package io.confluent.license;

import io.confluent.command.record.Command;
import io.confluent.license.validator.LicenseConfig;
import io.confluent.serializers.ProtoSerde;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/license/LicenseStore.class */
public class LicenseStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LicenseStore.class);
    private static final String KEY_PREFIX = "CONFLUENT_LICENSE";
    private static final Command.CommandKey KEY = Command.CommandKey.newBuilder().setConfigType(Command.CommandConfigType.LICENSE_INFO).setGuid(KEY_PREFIX).build();
    private static final Duration TOPIC_CREATE_RETRY_BACKOFF = Duration.ofSeconds(1);
    private static final Duration PARTITIONS_FOR_TIMEOUT = Duration.ofMinutes(1);
    private static final long PARTITIONS_FOR_INTERVAL_MS = Duration.ofSeconds(1).toMillis();
    private final String topic;
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    public static final String MIN_INSYNC_REPLICAS_CONFIG = "min.insync.replicas";
    public static final long READ_TO_END_TIMEOUT_MS = 120000;
    private static final int CONSUMER_REQ_TIMEOUT_MS = 120000;
    protected final KafkaBasedLog<Command.CommandKey, Command.CommandMessage> licenseLog;
    private final AtomicBoolean running;
    private final AtomicReference<String> latestLicense;
    private final Duration topicCreateTimeout;
    private final Duration retryBackoffMinMs;
    private final Duration retryBackoffMaxMs;
    private final Time time;

    /* loaded from: input_file:io/confluent/license/LicenseStore$ConsumeCallback.class */
    public static class ConsumeCallback implements Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> {
        private final AtomicReference<String> latestLicenseRef;

        ConsumeCallback(AtomicReference<String> atomicReference) {
            this.latestLicenseRef = atomicReference;
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<Command.CommandKey, Command.CommandMessage> consumerRecord) {
            if (th != null) {
                LicenseStore.log.error("Unexpected error in consumer callback for LicenseStore: ", th);
            } else if (consumerRecord.key().getConfigType() == Command.CommandConfigType.LICENSE_INFO) {
                this.latestLicenseRef.set(consumerRecord.value().getLicenseInfo().getJwt());
            }
        }
    }

    /* loaded from: input_file:io/confluent/license/LicenseStore$LicenseKeySerde.class */
    public static class LicenseKeySerde extends ProtoSerde<Command.CommandKey> {
        public LicenseKeySerde() {
            super(Command.CommandKey.getDefaultInstance());
        }
    }

    /* loaded from: input_file:io/confluent/license/LicenseStore$LicenseMessageSerde.class */
    public static class LicenseMessageSerde extends ProtoSerde<Command.CommandMessage> {
        public LicenseMessageSerde() {
            super(Command.CommandMessage.getDefaultInstance());
        }
    }

    public LicenseStore(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3) {
        this(str, map, map2, map3, Time.SYSTEM);
    }

    protected LicenseStore(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, Time time) {
        this(str, map, map2, map3, Duration.ZERO, Duration.ZERO, Duration.ZERO, Time.SYSTEM);
    }

    public LicenseStore(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, Duration duration, Duration duration2, Duration duration3, Time time) {
        this.running = new AtomicBoolean();
        this.latestLicense = new AtomicReference<>();
        this.time = time;
        this.topicCreateTimeout = duration;
        this.retryBackoffMinMs = duration2;
        this.retryBackoffMaxMs = duration3;
        NewTopic createTopicDescription = createTopicDescription(str, map3);
        this.topic = createTopicDescription.name();
        this.licenseLog = setupAndCreateKafkaBasedLog(map, map2, map3, createTopicDescription, this.latestLicense, this.time);
    }

    public LicenseStore(String str, AtomicReference<String> atomicReference, KafkaBasedLog<Command.CommandKey, Command.CommandMessage> kafkaBasedLog, Time time) {
        this.running = new AtomicBoolean();
        this.topic = str;
        this.latestLicense = atomicReference;
        this.licenseLog = kafkaBasedLog;
        this.time = time;
        this.topicCreateTimeout = Duration.ZERO;
        this.retryBackoffMinMs = Duration.ZERO;
        this.retryBackoffMaxMs = Duration.ZERO;
    }

    KafkaBasedLog<Command.CommandKey, Command.CommandMessage> setupAndCreateKafkaBasedLog(Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, NewTopic newTopic, AtomicReference<String> atomicReference, Time time) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LicenseKeySerde.class.getName());
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LicenseMessageSerde.class.getName());
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(map2);
        hashMap2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LicenseKeySerde.class.getName());
        hashMap2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LicenseMessageSerde.class.getName());
        return createKafkaBasedLog(hashMap, hashMap2, new ConsumeCallback(atomicReference), newTopic, map3, time);
    }

    private KafkaBasedLog<Command.CommandKey, Command.CommandMessage> createKafkaBasedLog(Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> callback, NewTopic newTopic, Map<String, Object> map3, Time time) {
        Runnable runnable = () -> {
            AdminClient create = AdminClient.create((Map<String, Object>) map3);
            Throwable th = null;
            try {
                try {
                    createConfluentLicenseTopic(create, this.topic, newTopic);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    KafkaConsumer kafkaConsumer = new KafkaConsumer((Map<String, Object>) map2);
                    Throwable th3 = null;
                    try {
                        verifyConfluentLicenseTopicMetadata(kafkaConsumer);
                        if (kafkaConsumer != null) {
                            if (0 == 0) {
                                kafkaConsumer.close();
                                return;
                            }
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        if (kafkaConsumer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th6) {
                                    th3.addSuppressed(th6);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                        throw th5;
                    }
                } catch (Throwable th7) {
                    th = th7;
                    throw th7;
                }
            } catch (Throwable th8) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th8;
            }
        };
        map2.put("request.timeout.ms", Integer.valueOf(CONSUMER_REQ_TIMEOUT_MS));
        return new KafkaBasedLog<>(this.topic, map, map2, callback, time, runnable);
    }

    private KafkaException toKafkaException(Throwable th) {
        return th instanceof KafkaException ? (KafkaException) th : new KafkaException(th);
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            log.info("Starting License Store");
            startLog();
            log.info("Started License Store");
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            log.info("Closing License Store");
            stopLog();
            log.info("Closed License Store");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startLog() {
        this.licenseLog.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopLog() {
        this.licenseLog.stop();
    }

    public String licenseScan() {
        try {
            this.licenseLog.readToEnd().get(120000L, TimeUnit.MILLISECONDS);
            return this.latestLicense.get();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to read license from Kafka: ", e);
            throw new IllegalStateException(e);
        }
    }

    public void registerLicense(String str) {
        registerLicense(str, null);
    }

    public void registerLicense(String str, org.apache.kafka.clients.producer.Callback callback) {
        this.licenseLog.send(KEY, Command.CommandMessage.newBuilder().setLicenseInfo(Command.LicenseInfo.newBuilder().setJwt(str).build()).build(), callback);
    }

    protected void createConfluentLicenseTopic(AdminClient adminClient, String str, NewTopic newTopic) {
        long milliseconds = this.time.milliseconds() + this.topicCreateTimeout.toMillis();
        if (isConfluentLicenseTopicOrTopicAlreadyExists(adminClient, str)) {
            return;
        }
        log.debug("Attempting to create topic {}", str);
        Throwable th = null;
        int i = 0;
        while (true) {
            try {
                adminClient.createTopics(Collections.singleton(newTopic)).all().get();
                log.debug("License topic {} created", str);
                return;
            } catch (InterruptedException e) {
                throw new InterruptException(e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (th == null) {
                    if (cause instanceof InvalidReplicationFactorException) {
                        log.info("Creating topic {} with replication factor {}. At least {} brokers must be started concurrently to complete license registration.", str, Short.valueOf(newTopic.replicationFactor()), Short.valueOf(newTopic.replicationFactor()));
                    } else if (cause instanceof UnsupportedVersionException) {
                        log.info("Topic {} could not be created due to UnsupportedVersionException. This may be indicate that a rolling upgrade from an older version is in progress. The request will be retried.", str);
                    }
                }
                th = cause;
                if (cause instanceof TopicExistsException) {
                    log.debug("License topic {} was created by different node", str);
                    return;
                }
                if (!(cause instanceof RetriableException) && !(cause instanceof InvalidReplicationFactorException) && !(cause instanceof UnsupportedVersionException)) {
                    throw toKafkaException(cause);
                }
                long milliseconds2 = milliseconds - this.time.milliseconds();
                if (milliseconds2 <= 0) {
                    throw new org.apache.kafka.common.errors.TimeoutException("License topic could not be created", th);
                }
                long min = Math.min(this.retryBackoffMaxMs.toMillis(), Math.max(this.retryBackoffMinMs.toMillis(), ((long) Math.pow(2.0d, i)) * TOPIC_CREATE_RETRY_BACKOFF.toMillis()));
                i++;
                long min2 = Math.min(milliseconds2, min);
                log.debug("Topic could not be created, retrying after {}ms: {}", Long.valueOf(min2), th.getMessage());
                this.time.sleep(min2);
            }
        }
    }

    protected void verifyConfluentLicenseTopicMetadata(Consumer<Command.CommandKey, Command.CommandMessage> consumer) {
        long milliseconds = this.time.milliseconds() + PARTITIONS_FOR_TIMEOUT.toMillis();
        Exception exc = null;
        while (milliseconds - this.time.milliseconds() > 0) {
            try {
            } catch (Exception e) {
                if (e.getCause() instanceof InvalidReplicationFactorException) {
                    log.info("Topic metadata may not have been propagated yet. Brokers may be starting up. Retrying.");
                } else {
                    log.warn("Unexpected exception.", e.getCause());
                }
                exc = e;
                log.debug("Failed to retrieve license topic metadata, retrying after {}ms: {}", Long.valueOf(PARTITIONS_FOR_INTERVAL_MS), e.getMessage());
            }
            if (!consumer.partitionsFor(this.topic).isEmpty()) {
                return;
            } else {
                this.time.sleep(PARTITIONS_FOR_INTERVAL_MS);
            }
        }
        throw new org.apache.kafka.common.errors.TimeoutException("Timed out retrieving license topic metadata.", exc);
    }

    private NewTopic createTopicDescription(String str, Map<String, Object> map) {
        String str2 = (String) map.get("replication.factor");
        short parseShort = str2 == null ? (short) 3 : Short.parseShort(str2);
        String str3 = (String) map.get("min.insync.replicas");
        short min = (short) Math.min((int) (str3 == null ? (short) 2 : Short.valueOf(str3).shortValue()), (int) parseShort);
        AdminClient create = AdminClient.create(map);
        Throwable th = null;
        try {
            try {
                NewTopic build = TopicAdmin.defineTopic(chooseFinalLicenseTopic(create, str)).compacted().partitions(1).replicationFactor(parseShort).minInSyncReplicas(min).build();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return build;
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private String chooseFinalLicenseTopic(AdminClient adminClient, String str) {
        return topicExists(adminClient, str) ? str : LicenseConfig.CONFLUENT_COMMAND_TOPIC;
    }

    private boolean isConfluentLicenseTopicOrTopicAlreadyExists(AdminClient adminClient, String str) {
        return str.equals(LicenseConfig.TOPIC_DEFAULT) || topicExists(adminClient, str);
    }

    private boolean topicExists(AdminClient adminClient, String str) {
        try {
            DescribeTopicsResult describeTopics = adminClient.describeTopics(Collections.singleton(str));
            try {
                describeTopics.allTopicNames().get();
            } catch (NoSuchMethodError e) {
                describeTopics.all().get();
            }
            log.debug("Topic {} already exists.", str);
            return true;
        } catch (InterruptedException e2) {
            throw new InterruptException(e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause instanceof UnknownTopicOrPartitionException) {
                log.debug("Topic {} does not exist", str);
                return false;
            }
            if (cause instanceof RetriableException) {
                log.debug("Topic could not be described", (Throwable) e3);
                return false;
            }
            if (!LicenseConfig.TOPIC_DEFAULT.equals(str) || !(cause instanceof TopicAuthorizationException)) {
                throw toKafkaException(cause);
            }
            log.debug("{} Topic could not be described due to TopicAuthorization error.", LicenseConfig.TOPIC_DEFAULT);
            return false;
        }
    }

    public String topic() {
        return this.topic;
    }
}
