/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.ConfigCallbackHandler;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.Mapping;
import io.confluent.connect.elasticsearch.OffsetState;
import io.confluent.connect.elasticsearch.RetryUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indices.CreateDataStreamRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchClient {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class);
    private static final long WAIT_TIME_MS = 10L;
    private static final long CLOSE_WAIT_TIME_MS = 5000L;
    private static final String RESOURCE_ALREADY_EXISTS_EXCEPTION = "resource_already_exists_exception";
    private static final String VERSION_CONFLICT_EXCEPTION = "version_conflict_engine_exception";
    private static final Set<String> MALFORMED_DOC_ERRORS = new HashSet<String>(Arrays.asList("strict_dynamic_mapping_exception", "mapper_parsing_exception", "illegal_argument_exception", "action_request_validation_exception", "document_parsing_exception"));
    private static final String UNKNOWN_VERSION_TAG = "Unknown";
    protected final AtomicInteger numBufferedRecords;
    private final AtomicReference<ConnectException> error;
    protected final BulkProcessor bulkProcessor;
    private final ConcurrentMap<DocWriteRequest<?>, SinkRecordAndOffset> requestToSinkRecord;
    private final ConcurrentMap<Long, List<SinkRecordAndOffset>> inFlightRequests;
    private final ElasticsearchSinkConnectorConfig config;
    private final ErrantRecordReporter reporter;
    private final RestHighLevelClient client;
    private final ExecutorService bulkExecutorService;
    private final Time clock;
    private final Lock inFlightRequestLock = new ReentrantLock();
    private final Condition inFlightRequestsUpdated = this.inFlightRequestLock.newCondition();
    private final String esVersion;

    public ElasticsearchClient(ElasticsearchSinkConnectorConfig config, ErrantRecordReporter reporter, Runnable afterBulkCallback) {
        this.bulkExecutorService = Executors.newFixedThreadPool(config.maxInFlightRequests());
        this.numBufferedRecords = new AtomicInteger(0);
        this.error = new AtomicReference();
        this.requestToSinkRecord = new ConcurrentHashMap();
        this.inFlightRequests = reporter != null ? new ConcurrentHashMap() : null;
        this.config = config;
        this.reporter = reporter;
        this.clock = Time.SYSTEM;
        ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config);
        RestClient client = RestClient.builder((HttpHost[])config.connectionUrls().stream().map(HttpHost::create).collect(Collectors.toList()).toArray(new HttpHost[config.connectionUrls().size()])).setHttpClientConfigCallback((RestClientBuilder.HttpClientConfigCallback)configCallbackHandler).build();
        this.esVersion = this.getServerVersion(client);
        RestHighLevelClientBuilder clientBuilder = new RestHighLevelClientBuilder(client);
        if (this.shouldSetCompatibilityToES8()) {
            log.info("Staring client in ES 8 compatibility mode");
            clientBuilder.setApiCompatibilityMode(Boolean.valueOf(true));
        }
        this.client = clientBuilder.build();
        this.bulkProcessor = BulkProcessor.builder(this.buildConsumer(), (BulkProcessor.Listener)this.buildListener(afterBulkCallback)).setBulkActions(config.batchSize()).setBulkSize(config.bulkSize()).setConcurrentRequests(config.maxInFlightRequests() - 1).setFlushInterval(TimeValue.timeValueMillis((long)config.lingerMs())).setBackoffPolicy(BackoffPolicy.noBackoff()).build();
    }

    private boolean shouldSetCompatibilityToES8() {
        return !this.version().equals(UNKNOWN_VERSION_TAG) && Integer.parseInt(this.version().split("\\.")[0]) >= 8;
    }

    private String getServerVersion(RestClient client) {
        RestHighLevelClient highLevelClient = new RestHighLevelClientBuilder(client).build();
        String esVersionNumber = UNKNOWN_VERSION_TAG;
        try {
            MainResponse response = highLevelClient.info(RequestOptions.DEFAULT);
            esVersionNumber = response.getVersion().getNumber();
        }
        catch (Exception e) {
            log.warn("Failed to get ES server version", (Throwable)e);
        }
        return esVersionNumber;
    }

    private BiConsumer<BulkRequest, ActionListener<BulkResponse>> buildConsumer() {
        return (req, lis) -> this.bulkExecutorService.submit(() -> {
            try {
                BulkResponse bulkResponse = this.callWithRetries("execute bulk request", () -> this.client.bulk(req, RequestOptions.DEFAULT));
                lis.onResponse((Object)bulkResponse);
            }
            catch (Exception ex) {
                lis.onFailure(ex);
            }
            catch (Throwable ex) {
                lis.onFailure((Exception)new ConnectException("Bulk request failed", ex));
            }
        });
    }

    public RestHighLevelClient client() {
        return this.client;
    }

    public void close() {
        try {
            if (!this.bulkProcessor.awaitClose(this.config.flushTimeoutMs(), TimeUnit.MILLISECONDS)) {
                throw new ConnectException("Failed to process outstanding requests in time while closing the ElasticsearchClient.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConnectException("Interrupted while processing all in-flight requests on ElasticsearchClient close.", (Throwable)e);
        }
        finally {
            this.closeResources();
        }
    }

    public boolean createIndexOrDataStream(String name) {
        if (this.indexExists(name)) {
            return false;
        }
        return this.config.isDataStream() ? this.createDataStream(name) : this.createIndex(name);
    }

    public void createMapping(String index, Schema schema) {
        PutMappingRequest request = new PutMappingRequest(new String[]{index}).source(Mapping.buildMapping(schema));
        this.callWithRetries(String.format("create mapping for index %s with schema %s", index, schema), () -> this.client.indices().putMapping(request, RequestOptions.DEFAULT));
    }

    public String version() {
        return this.esVersion;
    }

    public void flush() {
        this.bulkProcessor.flush();
    }

    public void waitForInFlightRequests() {
        this.inFlightRequestLock.lock();
        try {
            while (this.numBufferedRecords.get() > 0) {
                this.inFlightRequestsUpdated.await();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConnectException((Throwable)e);
        }
        finally {
            this.inFlightRequestLock.unlock();
        }
    }

    public boolean hasMapping(String index) {
        MappingMetadata mapping = this.mapping(index);
        return mapping != null && mapping.sourceAsMap() != null && !mapping.sourceAsMap().isEmpty();
    }

    public void index(SinkRecord record, DocWriteRequest<?> request, OffsetState offsetState) {
        this.throwIfFailed();
        this.verifyNumBufferedRecords();
        this.requestToSinkRecord.put(request, new SinkRecordAndOffset(record, offsetState));
        this.numBufferedRecords.incrementAndGet();
        this.bulkProcessor.add(request);
    }

    public void throwIfFailed() {
        if (this.isFailed()) {
            try {
                this.close();
            }
            catch (ConnectException e) {
                log.warn("Couldn't close elasticsearch client", (Throwable)e);
            }
            throw this.error.get();
        }
    }

    private void verifyNumBufferedRecords() {
        long maxWaitTime = this.clock.milliseconds() + this.config.flushTimeoutMs();
        while (this.numBufferedRecords.get() >= this.config.maxBufferedRecords()) {
            this.clock.sleep(10L);
            if (this.clock.milliseconds() <= maxWaitTime) continue;
            throw new ConnectException(String.format("Could not make space in the internal buffer fast enough. Consider increasing %s or %s.", "flush.timeout.ms", "max.buffered.records"));
        }
    }

    public boolean indexExists(String index) {
        GetIndexRequest request = new GetIndexRequest(new String[]{index});
        return this.callWithRetries("check if index " + index + " exists", () -> this.client.indices().exists(request, RequestOptions.DEFAULT));
    }

    private BulkProcessor.Listener buildListener(final Runnable afterBulkCallback) {
        return new BulkProcessor.Listener(){

            public void beforeBulk(long executionId, BulkRequest request) {
                if (ElasticsearchClient.this.inFlightRequests != null) {
                    List sinkRecords = request.requests().stream().map(ElasticsearchClient.this.requestToSinkRecord::get).collect(Collectors.toList());
                    ElasticsearchClient.this.inFlightRequests.put(executionId, sinkRecords);
                }
            }

            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                List requests = request.requests();
                int idx = 0;
                for (BulkItemResponse bulkItemResponse : response) {
                    DocWriteRequest req;
                    boolean failed = ElasticsearchClient.this.handleResponse(bulkItemResponse, req = idx < requests.size() ? (DocWriteRequest)requests.get(idx) : null, executionId);
                    if (!failed && req != null) {
                        ((SinkRecordAndOffset)ElasticsearchClient.this.requestToSinkRecord.get(req)).offsetState.markProcessed();
                    }
                    ++idx;
                }
                afterBulkCallback.run();
                this.bulkFinished(executionId, request);
            }

            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                log.warn("Bulk request {} failed", (Object)executionId, (Object)failure);
                ElasticsearchClient.this.error.compareAndSet(null, new ConnectException("Bulk request failed", failure));
                this.bulkFinished(executionId, request);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void bulkFinished(long executionId, BulkRequest request) {
                request.requests().forEach(ElasticsearchClient.this.requestToSinkRecord::remove);
                ElasticsearchClient.this.removeFromInFlightRequests(executionId);
                ElasticsearchClient.this.inFlightRequestLock.lock();
                try {
                    ElasticsearchClient.this.numBufferedRecords.addAndGet(-request.requests().size());
                    ElasticsearchClient.this.inFlightRequestsUpdated.signalAll();
                }
                finally {
                    ElasticsearchClient.this.inFlightRequestLock.unlock();
                }
            }
        };
    }

    private <T> T callWithRetries(String description, Callable<T> function) {
        return RetryUtil.callWithRetries(description, function, this.config.maxRetries() + 1, this.config.retryBackoffMs());
    }

    private void closeResources() {
        this.bulkExecutorService.shutdown();
        try {
            if (!this.bulkExecutorService.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                this.bulkExecutorService.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.bulkExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
            log.warn("Interrupted while awaiting for executor service shutdown.", (Throwable)e);
        }
        try {
            this.client.close();
        }
        catch (IOException e) {
            log.warn("Failed to close Elasticsearch client.", (Throwable)e);
        }
    }

    private boolean createDataStream(String dataStream) {
        CreateDataStreamRequest request = new CreateDataStreamRequest(dataStream);
        return this.callWithRetries("create data stream " + dataStream, () -> {
            try {
                this.client.indices().createDataStream(request, RequestOptions.DEFAULT);
            }
            catch (IOException | ElasticsearchStatusException e) {
                if (!e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
                    throw e;
                }
                return false;
            }
            return true;
        });
    }

    private boolean createIndex(String index) {
        CreateIndexRequest request = new CreateIndexRequest(index);
        return this.callWithRetries("create index " + index, () -> {
            try {
                this.client.indices().create(request, RequestOptions.DEFAULT);
            }
            catch (IOException | ElasticsearchStatusException e) {
                if (!e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
                    throw e;
                }
                return false;
            }
            return true;
        });
    }

    protected boolean handleResponse(BulkItemResponse response, DocWriteRequest<?> request, long executionId) {
        if (response.isFailed()) {
            for (String error : MALFORMED_DOC_ERRORS) {
                if (!response.getFailureMessage().contains(error)) continue;
                this.reportBadRecordAndError(response, executionId);
                return this.handleMalformedDocResponse();
            }
            if (response.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION)) {
                if (request == null || request.versionType() != VersionType.EXTERNAL) {
                    log.warn("{} version conflict for operation {} on document '{}' version {} in index '{}'.", new Object[]{request != null ? request.versionType() : "UNKNOWN", response.getOpType(), response.getId(), response.getVersion(), response.getIndex()});
                    log.trace("{} version conflict for operation {} on document '{}' version {} in index '{}'", new Object[]{request != null ? request.versionType() : "UNKNOWN", response.getOpType(), response.getId(), response.getVersion(), response.getIndex()});
                    this.reportBadRecordAndError(response, executionId);
                } else {
                    log.debug("Ignoring EXTERNAL version conflict for operation {} on document '{}' version {} in index '{}'.", new Object[]{response.getOpType(), response.getId(), request.version(), response.getIndex()});
                }
                return false;
            }
            this.reportBadRecordAndError(response, executionId);
            this.error.compareAndSet(null, new ConnectException("Indexing record failed. Please check DLQ topic for errors."));
            return true;
        }
        return false;
    }

    private boolean handleMalformedDocResponse() {
        String errorMsg = "Encountered an illegal document error. Ignoring and will not index record. Please check DLQ topic for errors.";
        switch (this.config.behaviorOnMalformedDoc()) {
            case IGNORE: {
                log.debug(errorMsg);
                return false;
            }
            case WARN: {
                log.warn(errorMsg);
                return false;
            }
        }
        log.error(String.format("Encountered an illegal document error. Please check DLQ topic for errors. To ignore future records like this, change the configuration '%s' to '%s'.", new Object[]{"behavior.on.malformed.documents", ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc.IGNORE}));
        this.error.compareAndSet(null, new ConnectException("Indexing record failed. Please check DLQ topic for errors."));
        return true;
    }

    public boolean isFailed() {
        return this.error.get() != null;
    }

    private MappingMetadata mapping(String index) {
        GetMappingsRequest request = new GetMappingsRequest().indices(new String[]{index});
        GetMappingsResponse response = this.callWithRetries("get mapping for index " + index, () -> this.client.indices().getMapping(request, RequestOptions.DEFAULT));
        return (MappingMetadata)response.mappings().get(index);
    }

    private void removeFromInFlightRequests(long executionId) {
        if (this.inFlightRequests != null) {
            this.inFlightRequests.remove(executionId);
        }
    }

    private synchronized void reportBadRecordAndError(BulkItemResponse response, long executionId) {
        if (response.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION) && this.config.isDataStream()) {
            log.debug("Skipping DLQ insertion for DataStream type.");
            return;
        }
        if (this.reporter != null) {
            SinkRecordAndOffset original;
            List sinkRecords = this.inFlightRequests.getOrDefault(executionId, new ArrayList());
            SinkRecordAndOffset sinkRecordAndOffset = original = sinkRecords.size() > response.getItemId() ? (SinkRecordAndOffset)sinkRecords.get(response.getItemId()) : null;
            if (original != null) {
                this.reporter.report(original.sinkRecord, (Throwable)new ReportingException("Indexing failed: " + response.getFailureMessage()));
            }
        }
    }

    public static class ReportingException
    extends RuntimeException {
        public ReportingException(String message) {
            super(message);
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    private static class SinkRecordAndOffset {
        private final SinkRecord sinkRecord;
        private final OffsetState offsetState;

        public SinkRecordAndOffset(SinkRecord sinkRecord, OffsetState offsetState) {
            this.sinkRecord = sinkRecord;
            this.offsetState = offsetState;
        }
    }
}

