package io.debezium.relational.history;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.util.Collect;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;

@NotThreadSafe
/* loaded from: input_file:io/debezium/relational/history/KafkaDatabaseHistory.class */
public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
    private static final String DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME = "default.replication.factor";
    private static final String CONSUMER_PREFIX = "database.history.consumer.";
    private static final String PRODUCER_PREFIX = "database.history.producer.";
    private String topicName;
    private Configuration consumerConfig;
    private Configuration producerConfig;
    private volatile KafkaProducer<String, String> producer;
    private int maxRecoveryAttempts;
    public static final Field TOPIC = Field.create("database.history.kafka.topic").withDisplayName("Database history topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the database schema history").withValidation(Field::isRequired);
    public static final Field BOOTSTRAP_SERVERS = Field.create("database.history.kafka.bootstrap.servers").withDisplayName("Kafka broker addresses").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving database schema history previously stored by the connector. This should point to the same Kafka cluster used by the Kafka Connect process.").withValidation(Field::isRequired);
    public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create("database.history.kafka.recovery.poll.interval.ms").withDisplayName("Poll interval during database history recovery (ms)").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while polling for persisted data during recovery.").withDefault(100).withValidation(Field::isNonNegativeInteger);
    public static final Field RECOVERY_POLL_ATTEMPTS = Field.create("database.history.kafka.recovery.attempts").withDisplayName("Max attempts to recovery database history").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).").withDefault(100).withValidation(Field::isInteger);
    public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, DatabaseHistory.NAME, RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS);
    private static final Duration KAFKA_QUERY_TIMEOUT = Duration.ofSeconds(3);
    private static final Integer PARTITION = 0;
    private final DocumentReader reader = DocumentReader.defaultReader();
    private int pollIntervalMs = -1;

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator) {
        super.configure(configuration, historyRecordComparator);
        Field.Set set = ALL_FIELDS;
        Logger logger = this.logger;
        logger.getClass();
        if (!configuration.validateAndRecord(set, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = configuration.getString(TOPIC);
        this.pollIntervalMs = configuration.getInteger(RECOVERY_POLL_INTERVAL_MS);
        this.maxRecoveryAttempts = configuration.getInteger(RECOVERY_POLL_ATTEMPTS);
        String string = configuration.getString(BOOTSTRAP_SERVERS);
        String string2 = configuration.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
        this.consumerConfig = configuration.subset(CONSUMER_PREFIX, true).edit().withDefault("bootstrap.servers", string).withDefault("client.id", string2).withDefault("group.id", string2).withDefault("fetch.min.bytes", 1).withDefault("enable.auto.commit", false).withDefault("session.timeout.ms", 10000).withDefault("auto.offset.reset", OffsetResetStrategy.EARLIEST.toString().toLowerCase()).withDefault("key.deserializer", StringDeserializer.class).withDefault("value.deserializer", StringDeserializer.class).build();
        this.producerConfig = configuration.subset(PRODUCER_PREFIX, true).edit().withDefault("bootstrap.servers", string).withDefault("client.id", string2).withDefault("acks", 1).withDefault("retries", 1).withDefault("batch.size", 32768).withDefault("linger.ms", 0).withDefault("buffer.memory", 1048576).withDefault("key.serializer", StringSerializer.class).withDefault("value.serializer", StringSerializer.class).withDefault("max.block.ms", 10000).build();
        this.logger.info("KafkaDatabaseHistory Consumer config: " + this.consumerConfig.withMaskedPasswords());
        this.logger.info("KafkaDatabaseHistory Producer config: " + this.producerConfig.withMaskedPasswords());
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public synchronized void start() {
        super.start();
        if (this.producer == null) {
            this.producer = new KafkaProducer<>(this.producerConfig.asProperties());
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory
    protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records.");
        }
        this.logger.trace("Storing record into database history: {}", historyRecord);
        try {
            Future send = this.producer.send(new ProducerRecord(this.topicName, PARTITION, (Object) null, historyRecord.toString()));
            this.producer.flush();
            RecordMetadata recordMetadata = (RecordMetadata) send.get();
            if (recordMetadata != null) {
                this.logger.debug("Stored record in topic '{}' partition {} at offset {} ", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
            }
        } catch (InterruptedException e) {
            this.logger.trace("Interrupted before record was written into database history: {}", historyRecord);
            Thread.interrupted();
            throw new DatabaseHistoryException(e);
        } catch (ExecutionException e2) {
            throw new DatabaseHistoryException(e2);
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory
    protected void recoverRecords(Consumer<HistoryRecord> consumer) {
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.consumerConfig.asProperties());
        Throwable th = null;
        try {
            this.logger.debug("Subscribing to database history topic '{}'", this.topicName);
            kafkaConsumer.subscribe(Collect.arrayListOf(this.topicName, new String[0]));
            long j = -1;
            Long l = null;
            int i = 0;
            while (i <= this.maxRecoveryAttempts) {
                l = getEndOffsetOfDbHistoryTopic(l, kafkaConsumer);
                this.logger.debug("End offset of database history topic is {}", l);
                int i2 = 0;
                Iterator it = kafkaConsumer.poll(this.pollIntervalMs).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    try {
                        if (j < consumerRecord.offset()) {
                            if (consumerRecord.value() == null) {
                                this.logger.warn("Skipping null database history record. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", this.topicName);
                            } else {
                                HistoryRecord historyRecord = new HistoryRecord(this.reader.read((String) consumerRecord.value()));
                                this.logger.trace("Recovering database history: {}", historyRecord);
                                if (historyRecord == null || !historyRecord.isValid()) {
                                    this.logger.warn("Skipping invalid database history record '{}'. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", historyRecord, this.topicName);
                                } else {
                                    consumer.accept(historyRecord);
                                    this.logger.trace("Recovered database history: {}", historyRecord);
                                }
                            }
                            j = consumerRecord.offset();
                            i2++;
                        }
                    } catch (IOException e) {
                        this.logger.error("Error while deserializing history record '{}'", consumerRecord, e);
                    } catch (Exception e2) {
                        this.logger.error("Unexpected exception while processing record '{}'", consumerRecord, e2);
                        throw e2;
                    }
                }
                if (i2 == 0) {
                    this.logger.debug("No new records found in the database history; will retry");
                } else {
                    this.logger.debug("Processed {} records from database history", Integer.valueOf(i2));
                }
                i++;
                if (j >= l.longValue() - 1) {
                    if (kafkaConsumer != null) {
                        if (0 == 0) {
                            kafkaConsumer.close();
                            return;
                        }
                        try {
                            kafkaConsumer.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
            }
            throw new IllegalStateException("The database history couldn't be recovered. Consider to increase the value for " + RECOVERY_POLL_INTERVAL_MS.name());
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private Long getEndOffsetOfDbHistoryTopic(Long l, KafkaConsumer<String, String> kafkaConsumer) {
        Long l2 = (Long) ((Map.Entry) kafkaConsumer.endOffsets(Collections.singleton(new TopicPartition(this.topicName, PARTITION.intValue()))).entrySet().iterator().next()).getValue();
        if (l == null || l.equals(l2)) {
            return l2;
        }
        throw new IllegalStateException("Detected changed end offset of database history topic (previous: " + l + ", current: " + l2 + "). Make sure that the same history topic isn't shared by multiple connector instances.");
    }

    @Override // io.debezium.relational.history.DatabaseHistory
    public boolean exists() {
        boolean z = false;
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig.asProperties());
        Throwable th = null;
        try {
            try {
                if (kafkaConsumer.listTopics().keySet().contains(this.topicName)) {
                    Set singleton = Collections.singleton(new TopicPartition(this.topicName, PARTITION.intValue()));
                    z = ((Long) ((Map.Entry) kafkaConsumer.endOffsets(singleton).entrySet().iterator().next()).getValue()).longValue() > ((Long) ((Map.Entry) kafkaConsumer.beginningOffsets(singleton).entrySet().iterator().next()).getValue()).longValue();
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return z;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public synchronized void stop() {
        try {
            if (this.producer != null) {
                try {
                    this.producer.flush();
                    this.producer.close();
                } catch (Throwable th) {
                    this.producer.close();
                    throw th;
                }
            }
        } finally {
            this.producer = null;
            super.stop();
        }
    }

    public String toString() {
        return this.topicName != null ? "Kafka topic " + this.topicName + ":" + PARTITION + " using brokers at " + this.producerConfig.getString(BOOTSTRAP_SERVERS) : "Kafka topic";
    }

    protected static String consumerConfigPropertyName(String str) {
        return CONSUMER_PREFIX + str;
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void initializeStorage() {
        super.initializeStorage();
        AdminClient create = AdminClient.create(this.producerConfig.asProperties());
        try {
            NewTopic newTopic = new NewTopic(this.topicName, 1, Short.parseShort(getKafkaBrokerConfig(create).get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value()));
            newTopic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(Long.MAX_VALUE), "retention.bytes", "-1"));
            create.createTopics(Collections.singleton(newTopic));
            this.logger.info("Database history topic '{}' created", newTopic);
        } catch (Exception e) {
            throw new ConnectException("Creation of database history topic failed, please create the topic manually", e);
        }
    }

    private Config getKafkaBrokerConfig(AdminClient adminClient) throws Exception {
        Collection collection = (Collection) adminClient.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        if (collection.isEmpty()) {
            throw new ConnectException("No brokers available to obtain default settings");
        }
        Map map = (Map) adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, ((Node) collection.iterator().next()).idString()))).all().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        if (map.isEmpty()) {
            throw new ConnectException("No configs have been received");
        }
        return (Config) map.values().iterator().next();
    }
}
