/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSourceTaskContext;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ShutdownableThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkerSourceTask
implements WorkerTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
    private static final long SEND_FAILED_BACKOFF_MS = 100L;
    private final ConnectorTaskId id;
    private final SourceTask task;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private KafkaProducer<byte[], byte[]> producer;
    private WorkerSourceTaskThread workThread;
    private final OffsetStorageReader offsetReader;
    private final OffsetStorageWriter offsetWriter;
    private final WorkerConfig workerConfig;
    private final Time time;
    private List<SourceRecord> toSend;
    private boolean lastSendFailed;
    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
    private boolean flushing;
    private CountDownLatch stopRequestedLatch;

    public WorkerSourceTask(ConnectorTaskId id, SourceTask task, Converter keyConverter, Converter valueConverter, KafkaProducer<byte[], byte[]> producer, OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, WorkerConfig workerConfig, Time time) {
        this.id = id;
        this.task = task;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
        this.producer = producer;
        this.offsetReader = offsetReader;
        this.offsetWriter = offsetWriter;
        this.workerConfig = workerConfig;
        this.time = time;
        this.toSend = null;
        this.lastSendFailed = false;
        this.outstandingMessages = new IdentityHashMap();
        this.outstandingMessagesBacklog = new IdentityHashMap();
        this.flushing = false;
        this.stopRequestedLatch = new CountDownLatch(1);
    }

    @Override
    public void start(Map<String, String> props) {
        this.workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + this.id, props);
        this.workThread.start();
    }

    @Override
    public void stop() {
        if (this.workThread != null) {
            this.workThread.startGracefulShutdown();
            this.stopRequestedLatch.countDown();
        }
    }

    @Override
    public boolean awaitStop(long timeoutMs) {
        boolean success = true;
        if (this.workThread != null) {
            try {
                success = this.workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
                if (!success) {
                    this.workThread.forceShutdown();
                }
            }
            catch (InterruptedException e) {
                success = false;
            }
        }
        return success;
    }

    @Override
    public void close() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean sendRecords() {
        int processed = 0;
        for (final SourceRecord record : this.toSend) {
            byte[] key = this.keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
            byte[] value = this.valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
            final ProducerRecord producerRecord = new ProducerRecord(record.topic(), record.kafkaPartition(), (Object)key, (Object)value);
            log.trace("Appending record with key {}, value {}", record.key(), record.value());
            WorkerSourceTask workerSourceTask = this;
            synchronized (workerSourceTask) {
                if (!this.lastSendFailed) {
                    if (!this.flushing) {
                        this.outstandingMessages.put((ProducerRecord<byte[], byte[]>)producerRecord, (ProducerRecord<byte[], byte[]>)producerRecord);
                    } else {
                        this.outstandingMessagesBacklog.put((ProducerRecord<byte[], byte[]>)producerRecord, (ProducerRecord<byte[], byte[]>)producerRecord);
                    }
                    this.offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
                }
            }
            try {
                this.producer.send(producerRecord, new org.apache.kafka.clients.producer.Callback(){

                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e != null) {
                            log.error("{} failed to send record to {}: {}", new Object[]{WorkerSourceTask.this.id, record.topic(), e});
                            log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}", new Object[]{record.topic(), record.kafkaPartition(), record.key(), record.value(), record.sourceOffset(), record.sourcePartition()});
                        } else {
                            log.trace("Wrote record successfully: topic {} partition {} offset {}", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                        }
                        WorkerSourceTask.this.recordSent((ProducerRecord<byte[], byte[]>)producerRecord);
                    }
                });
                this.lastSendFailed = false;
            }
            catch (RetriableException e) {
                log.warn("Failed to send {}, backing off before retrying:", (Object)producerRecord, (Object)e);
                this.toSend = this.toSend.subList(processed, this.toSend.size());
                this.lastSendFailed = true;
                return false;
            }
            catch (KafkaException e) {
                throw new ConnectException("Unrecoverable exception trying to send", (Throwable)e);
            }
            ++processed;
        }
        this.toSend = null;
        return true;
    }

    private synchronized void recordSent(ProducerRecord<byte[], byte[]> record) {
        ProducerRecord<byte[], byte[]> removed = this.outstandingMessages.remove(record);
        if (removed == null && this.flushing) {
            removed = this.outstandingMessagesBacklog.remove(record);
        }
        if (removed == null) {
            log.error("CRITICAL Saw callback for record that was not present in the outstanding message set: {}", record);
        } else if (this.flushing && this.outstandingMessages.isEmpty()) {
            this.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean commitOffsets() {
        long commitTimeoutMs = this.workerConfig.getLong("offset.flush.timeout.ms");
        log.debug("{} Committing offsets", (Object)this);
        long started = this.time.milliseconds();
        long timeout = started + commitTimeoutMs;
        WorkerSourceTask workerSourceTask = this;
        synchronized (workerSourceTask) {
            this.flushing = true;
            boolean flushStarted = this.offsetWriter.beginFlush();
            log.debug("{} flushing {} outstanding messages for offset commit", (Object)this, (Object)this.outstandingMessages.size());
            while (!this.outstandingMessages.isEmpty()) {
                try {
                    long timeoutMs = timeout - this.time.milliseconds();
                    if (timeoutMs <= 0L) {
                        log.error("Failed to flush {}, timed out while waiting for producer to flush outstanding messages, {} left ({})", new Object[]{this, this.outstandingMessages.size(), this.outstandingMessages});
                        this.finishFailedFlush();
                        return false;
                    }
                    this.wait(timeoutMs);
                }
                catch (InterruptedException e) {
                    log.error("{} Interrupted while flushing messages, offsets will not be committed", (Object)this);
                    this.finishFailedFlush();
                    return false;
                }
            }
            if (!flushStarted) {
                this.finishSuccessfulFlush();
                log.debug("Finished {} offset commitOffsets successfully in {} ms", (Object)this, (Object)(this.time.milliseconds() - started));
                return true;
            }
        }
        Future<Void> flushFuture = this.offsetWriter.doFlush(new Callback<Void>(){

            @Override
            public void onCompletion(Throwable error, Void result) {
                if (error != null) {
                    log.error("Failed to flush {} offsets to storage: ", (Object)this, (Object)error);
                } else {
                    log.trace("Finished flushing {} offsets to storage", (Object)this);
                }
            }
        });
        if (flushFuture == null) {
            this.finishFailedFlush();
            return false;
        }
        try {
            flushFuture.get(Math.max(timeout - this.time.milliseconds(), 0L), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            log.warn("Flush of {} offsets interrupted, cancelling", (Object)this);
            this.finishFailedFlush();
            return false;
        }
        catch (ExecutionException e) {
            log.error("Flush of {} offsets threw an unexpected exception: ", (Object)this, (Object)e);
            this.finishFailedFlush();
            return false;
        }
        catch (TimeoutException e) {
            log.error("Timed out waiting to flush {} offsets to storage", (Object)this);
            this.finishFailedFlush();
            return false;
        }
        this.finishSuccessfulFlush();
        log.info("Finished {} commitOffsets successfully in {} ms", (Object)this, (Object)(this.time.milliseconds() - started));
        return true;
    }

    private synchronized void finishFailedFlush() {
        this.offsetWriter.cancelFlush();
        this.outstandingMessages.putAll(this.outstandingMessagesBacklog);
        this.outstandingMessagesBacklog.clear();
        this.flushing = false;
    }

    private synchronized void finishSuccessfulFlush() {
        IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = this.outstandingMessages;
        this.outstandingMessages = this.outstandingMessagesBacklog;
        this.outstandingMessagesBacklog = temp;
        this.flushing = false;
    }

    public String toString() {
        return "WorkerSourceTask{id=" + this.id + '}';
    }

    private class WorkerSourceTaskThread
    extends ShutdownableThread {
        private Map<String, String> workerProps;
        private boolean finishedStart;
        private boolean startedShutdownBeforeStartCompleted;

        public WorkerSourceTaskThread(String name, Map<String, String> workerProps) {
            super(name);
            this.workerProps = workerProps;
            this.finishedStart = false;
            this.startedShutdownBeforeStartCompleted = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void execute() {
            try {
                WorkerSourceTask.this.task.initialize((SourceTaskContext)new WorkerSourceTaskContext(WorkerSourceTask.this.offsetReader));
                WorkerSourceTask.this.task.start(this.workerProps);
                log.info("Source task {} finished initialization and start", (Object)this);
                WorkerSourceTaskThread workerSourceTaskThread = this;
                synchronized (workerSourceTaskThread) {
                    if (this.startedShutdownBeforeStartCompleted) {
                        WorkerSourceTask.this.task.stop();
                        return;
                    }
                    this.finishedStart = true;
                }
                while (this.getRunning()) {
                    if (WorkerSourceTask.this.toSend == null) {
                        WorkerSourceTask.this.toSend = WorkerSourceTask.this.task.poll();
                    }
                    if (WorkerSourceTask.this.toSend == null || WorkerSourceTask.this.sendRecords()) continue;
                    WorkerSourceTask.this.stopRequestedLatch.await(100L, TimeUnit.MILLISECONDS);
                }
            }
            catch (InterruptedException e) {
            }
            catch (Throwable t) {
                log.error("Task {} threw an uncaught and unrecoverable exception", (Object)WorkerSourceTask.this.id);
                log.error("Task is being killed and will not recover until manually restarted:", t);
            }
            WorkerSourceTask.this.commitOffsets();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void startGracefulShutdown() {
            super.startGracefulShutdown();
            WorkerSourceTaskThread workerSourceTaskThread = this;
            synchronized (workerSourceTaskThread) {
                if (this.finishedStart) {
                    WorkerSourceTask.this.task.stop();
                } else {
                    this.startedShutdownBeforeStartCompleted = true;
                }
            }
        }
    }
}

