/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.AsyncOffsetTracker;
import io.confluent.connect.elasticsearch.DataConverter;
import io.confluent.connect.elasticsearch.ElasticsearchClient;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.OffsetState;
import io.confluent.connect.elasticsearch.OffsetTracker;
import io.confluent.connect.elasticsearch.SyncOffsetTracker;
import io.confluent.connect.elasticsearch.Version;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.elasticsearch.action.DocWriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchSinkTask
extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
    private DataConverter converter;
    private ElasticsearchClient client;
    private ElasticsearchSinkConnectorConfig config;
    private ErrantRecordReporter reporter;
    private Set<String> existingMappings;
    private Set<String> indexCache;
    private OffsetTracker offsetTracker;
    private PartitionPauser partitionPauser;

    public void start(Map<String, String> props) {
        this.start(props, null);
    }

    protected void start(Map<String, String> props, ElasticsearchClient client) {
        log.info("Starting ElasticsearchSinkTask.");
        this.config = new ElasticsearchSinkConnectorConfig(props);
        this.converter = new DataConverter(this.config);
        this.existingMappings = new HashSet<String>();
        this.indexCache = new HashSet<String>();
        int offsetHighWaterMark = this.config.maxBufferedRecords() * 10;
        int offsetLowWaterMark = this.config.maxBufferedRecords() * 5;
        this.partitionPauser = new PartitionPauser(this.context, () -> this.offsetTracker.numOffsetStateEntries() > (long)offsetHighWaterMark, () -> this.offsetTracker.numOffsetStateEntries() <= (long)offsetLowWaterMark);
        this.reporter = null;
        try {
            if (this.context.errantRecordReporter() == null) {
                log.info("Errant record reporter not configured.");
            }
            this.reporter = this.context.errantRecordReporter();
        }
        catch (NoClassDefFoundError | NoSuchMethodError e) {
            log.warn("AK versions prior to 2.6 do not support the errant record reporter.");
        }
        Runnable afterBulkCallback = () -> this.offsetTracker.updateOffsets();
        this.client = client != null ? client : new ElasticsearchClient(this.config, this.reporter, afterBulkCallback);
        this.offsetTracker = !this.config.flushSynchronously() ? new AsyncOffsetTracker(this.context) : new SyncOffsetTracker(this.client);
        log.info("Started ElasticsearchSinkTask. Connecting to ES server version: {}", (Object)this.client.version());
    }

    public void put(Collection<SinkRecord> records) throws ConnectException {
        log.debug("Putting {} records to Elasticsearch.", (Object)records.size());
        this.client.throwIfFailed();
        this.partitionPauser.maybeResumePartitions();
        for (SinkRecord record : records) {
            OffsetState offsetState = this.offsetTracker.addPendingRecord(record);
            if (this.shouldSkipRecord(record)) {
                this.logTrace("Ignoring {} with null value.", record);
                offsetState.markProcessed();
                this.reportBadRecord(record, new ConnectException("Cannot write null valued record."));
                continue;
            }
            this.logTrace("Writing {} to Elasticsearch.", record);
            this.tryWriteRecord(record, offsetState);
        }
        this.partitionPauser.maybePausePartitions();
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        try {
            this.client.flush();
        }
        catch (IllegalStateException e) {
            log.debug("Tried to flush data to Elasticsearch, but BulkProcessor is already closed.", (Throwable)e);
        }
        Map<TopicPartition, OffsetAndMetadata> offsets = this.offsetTracker.offsets(currentOffsets);
        log.debug("preCommitting offsets {}", offsets);
        return offsets;
    }

    public void stop() {
        log.debug("Stopping Elasticsearch client.");
        this.client.close();
    }

    public String version() {
        return Version.getVersion();
    }

    private void checkMapping(String index, SinkRecord record) {
        if (!this.config.shouldIgnoreSchema(record.topic()) && !this.existingMappings.contains(index)) {
            if (!this.client.hasMapping(index)) {
                this.client.createMapping(index, record.valueSchema());
            }
            log.debug("Caching mapping for index '{}' locally.", (Object)index);
            this.existingMappings.add(index);
        }
    }

    private String convertTopicToIndexName(String topic) {
        String index = topic.toLowerCase();
        if (index.length() > 255) {
            index = index.substring(0, 255);
        }
        if (index.startsWith("-") || index.startsWith("_")) {
            index = index.substring(1);
        }
        if (index.equals(".") || index.equals("..")) {
            index = index.replace(".", "dot");
            log.warn("Elasticsearch cannot have indices named {}. Index will be named {}.", (Object)topic, (Object)index);
        }
        if (!topic.equals(index)) {
            log.trace("Topic '{}' was translated to index '{}'.", (Object)topic, (Object)index);
        }
        return index;
    }

    private String convertTopicToDataStreamName(String topic) {
        String namespace = this.config.dataStreamNamespace();
        if ((namespace = namespace.replace("${topic}", topic.toLowerCase())).length() > 100) {
            namespace = namespace.substring(0, 100);
        }
        String dataStream = String.format("%s-%s-%s", this.config.dataStreamType().toLowerCase(), this.config.dataStreamDataset(), namespace);
        return dataStream;
    }

    private String createIndexName(String topic) {
        return this.config.isDataStream() ? this.convertTopicToDataStreamName(topic) : this.convertTopicToIndexName(topic);
    }

    private void ensureIndexExists(String index) {
        if (!this.indexCache.contains(index)) {
            log.info("Creating index {}.", (Object)index);
            this.client.createIndexOrDataStream(index);
            this.indexCache.add(index);
        }
    }

    private void logTrace(String formatMsg, SinkRecord record) {
        if (log.isTraceEnabled()) {
            log.trace(formatMsg, (Object)ElasticsearchSinkTask.recordString(record));
        }
    }

    private void reportBadRecord(SinkRecord record, Throwable error) {
        if (this.reporter != null) {
            this.reporter.report(record, error);
        }
    }

    private boolean shouldSkipRecord(SinkRecord record) {
        return record.value() == null && this.config.behaviorOnNullValues() == ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.IGNORE;
    }

    private void tryWriteRecord(SinkRecord sinkRecord, OffsetState offsetState) {
        String indexName = this.createIndexName(sinkRecord.topic());
        this.ensureIndexExists(indexName);
        this.checkMapping(indexName, sinkRecord);
        DocWriteRequest<?> docWriteRequest = null;
        try {
            docWriteRequest = this.converter.convertRecord(sinkRecord, indexName);
        }
        catch (DataException convertException) {
            this.reportBadRecord(sinkRecord, convertException);
            if (this.config.dropInvalidMessage()) {
                log.error("Can't convert {}.", (Object)ElasticsearchSinkTask.recordString(sinkRecord), (Object)convertException);
                offsetState.markProcessed();
            }
            throw convertException;
        }
        if (docWriteRequest != null) {
            this.logTrace("Adding {} to bulk processor.", sinkRecord);
            this.client.index(sinkRecord, docWriteRequest, offsetState);
        }
    }

    private static String recordString(SinkRecord record) {
        return String.format("record from topic=%s partition=%s offset=%s", record.topic(), record.kafkaPartition(), record.kafkaOffset());
    }

    public void close(Collection<TopicPartition> partitions) {
        this.offsetTracker.closePartitions(partitions);
    }

    static class PartitionPauser {
        private static final long PAUSE_POLL_TIMEOUT_MS = 100L;
        private final SinkTaskContext context;
        private final BooleanSupplier pauseCondition;
        private final BooleanSupplier resumeCondition;
        private boolean partitionsPaused;

        public PartitionPauser(SinkTaskContext context, BooleanSupplier pauseCondition, BooleanSupplier resumeCondition) {
            this.context = context;
            this.pauseCondition = pauseCondition;
            this.resumeCondition = resumeCondition;
        }

        void maybeResumePartitions() {
            if (this.partitionsPaused) {
                if (this.resumeCondition.getAsBoolean()) {
                    log.debug("Resuming all partitions");
                    this.context.resume(this.context.assignment().toArray(new TopicPartition[0]));
                    this.partitionsPaused = false;
                } else {
                    this.context.timeout(100L);
                }
            }
        }

        void maybePausePartitions() {
            if (!this.partitionsPaused && this.pauseCondition.getAsBoolean()) {
                log.debug("Pausing all partitions");
                this.context.pause(this.context.assignment().toArray(new TopicPartition[0]));
                this.context.timeout(100L);
                this.partitionsPaused = true;
            }
        }
    }
}

