/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jms.core.source;

import io.confluent.connect.jms.core.source.BaseJmsConnectorConfig;
import io.confluent.connect.jms.core.source.BaseJmsSourceConnectorConfig;
import io.confluent.connect.jms.core.source.JmsClientHelper;
import io.confluent.connect.jms.core.source.JmsSourceRecord;
import io.confluent.connect.jms.core.source.RecordConverter;
import io.confluent.connect.utils.retry.BackoffPolicies;
import io.confluent.connect.utils.retry.RetryCondition;
import io.confluent.connect.utils.retry.RetryPolicy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseJmsSourceTask<C extends BaseJmsSourceConnectorConfig>
extends SourceTask {
    public static final String TASK_ID_CONFIG_NAME = "task.jms.id";
    private static final long JMS_RECV_LONG_DURATION_MS = 5000L;
    private static final long JMS_RECV_SHORT_DURATION_MS = 10L;
    private static final long INITIAL_BACKOFF_TIME_MS = 100L;
    private static final long MAX_BACKOFF_TIME_MS = TimeUnit.MINUTES.toMillis(10L);
    static final Logger LOG = LoggerFactory.getLogger(BaseJmsSourceTask.class);
    public JmsClientHelper<C> clientHelper;
    public RecordConverter converter;
    public final AtomicReference<JMSException> commitException = new AtomicReference<Object>(null);
    protected C config;
    private String connectorName;
    private String taskId;
    private String connectorNameAndTaskId;
    private ConnectionFactory connectionFactory;
    private int batchSize;
    private int maxPending;
    private long maxPollDuration;
    private RetryPolicy retryPolicy;
    private RetryCondition retryCondition;
    private long duration;
    private final Lock pendingCommitsLock = new ReentrantLock();
    private final Condition pendingCommits = this.pendingCommitsLock.newCondition();
    protected final AtomicLong numProduced = new AtomicLong();
    protected final AtomicBoolean waitForAllPendingToCommit = new AtomicBoolean();
    protected ConcurrentMap<String, JmsSourceRecord> pendingMessages = new ConcurrentHashMap<String, JmsSourceRecord>();

    protected abstract ConnectionFactory connectionFactory();

    protected abstract C config(Map<String, String> var1);

    public void start(Map<String, String> settings) {
        this.connectorName = settings.get("name");
        this.taskId = settings.get(TASK_ID_CONFIG_NAME);
        this.connectorNameAndTaskId = String.format("%s-%s", this.connectorName, this.taskId);
        LOG.trace("{} start()", (Object)this);
        this.config = this.config(settings);
        this.converter = new RecordConverter((BaseJmsSourceConnectorConfig)((Object)this.config));
        this.connectionFactory = this.connectionFactory();
        this.clientHelper = new JmsClientHelper<C>(this.config, this.connectionFactory, this.connectorNameAndTaskId);
        this.batchSize = ((BaseJmsConnectorConfig)((Object)this.config)).batchSize();
        this.maxPending = ((BaseJmsConnectorConfig)((Object)this.config)).maxPending();
        this.maxPollDuration = ((BaseJmsConnectorConfig)((Object)this.config)).maxPollDuration();
        this.retryCondition = RetryCondition.retryOnConnectRetriable();
        this.retryPolicy = ((BaseJmsConnectorConfig)((Object)this.config)).maxRetryTimeMs() < 100L ? RetryPolicy.builder().withNoRetries().build() : RetryPolicy.builder().maxAttempts(Integer.MAX_VALUE).maxRetryTimeout(Duration.ofMillis(((BaseJmsConnectorConfig)((Object)this.config)).maxRetryTimeMs())).backoffPolicy(BackoffPolicies.exponentialJitter((Duration)Duration.ofMillis(100L), (Duration)Duration.ofMillis(MAX_BACKOFF_TIME_MS))).when(this.retryCondition).build();
    }

    protected boolean closeConnectionBeforeRetry(Throwable t) {
        return false;
    }

    protected boolean isRetriable(Throwable t) {
        return false;
    }

    protected JmsSourceRecord receive(long timeout) {
        LOG.trace("receive()");
        try {
            Message message;
            if (this.clientHelper.isClosed()) {
                LOG.debug("Connection is closed. Connecting...");
                this.clientHelper.connect();
            }
            if ((message = this.clientHelper.receive(timeout)) == null) {
                LOG.trace("No message received.");
                return null;
            }
            LOG.trace("Received message with id='{}'", (Object)message.getJMSMessageID());
            return this.converter.record(message);
        }
        catch (JMSException e) {
            if (this.closeConnectionBeforeRetry(e)) {
                this.clientHelper.close();
            }
            if (this.isRetriable(e)) {
                throw new RetriableException((Throwable)e);
            }
            throw new ConnectException((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> poll() throws InterruptedException {
        if (this.commitException.get() != null) {
            JMSException e = this.commitException.get();
            this.commitException.set(null);
            throw new ConnectException("Encountered an unrecoverable exception while committing a record.", (Throwable)e);
        }
        LOG.trace("{} poll()", (Object)this);
        if (this.waitForAllPendingToCommit.get()) {
            this.pendingCommitsLock.lock();
            try {
                long pendingCount = this.numProduced.get();
                if (pendingCount > 0L) {
                    long start = System.currentTimeMillis();
                    int pendingCountStart = this.pendingMessages.size();
                    LOG.debug("{} Reached {} pending records set by '{}', so waiting up to 1000ms for them to be committed to Kafka and acknowledged to JMS before consuming more", new Object[]{this, this.maxPending, "max.pending.messages"});
                    this.pendingCommits.await(1L, TimeUnit.SECONDS);
                    long millis = System.currentTimeMillis() - start;
                    int pendingCountRemain = this.pendingMessages.size();
                    if (pendingCountRemain > 0) {
                        int pendingCountCommitted = pendingCountStart - pendingCountRemain;
                        LOG.debug("{} Exceeded {}ms wait time for {} pending messages to be committed: {} committed and {} remain, so returning null from poll()", new Object[]{this, millis, pendingCountStart, pendingCountCommitted, pendingCountRemain});
                        List<SourceRecord> list = null;
                        return list;
                    }
                    LOG.debug("{} All {} pending messages have been committed in {}ms. Consuming messages", new Object[]{this, pendingCountStart, millis});
                }
            }
            finally {
                this.pendingCommitsLock.unlock();
            }
        }
        ArrayList<SourceRecord> records = null;
        this.duration = 5000L;
        long exceedTimeMs = System.currentTimeMillis() + this.maxPollDuration;
        for (int i = 0; i < this.batchSize; ++i) {
            if (i != 0 && this.exceededPollDuration(exceedTimeMs)) {
                LOG.debug("{} Returning {} records after exceeded max poll duration of {}ms", new Object[]{this, this.recordCount(records), this.maxPollDuration});
                this.waitForAllPendingToCommit.set(true);
                return records;
            }
            JmsSourceRecord record = (JmsSourceRecord)((Object)this.retryPolicy.call("receive JMS message", () -> this.receive(this.duration)));
            if (record == null) {
                LOG.debug("{} Returning {} records after receiving no new messages within {}ms", new Object[]{this, this.recordCount(records), this.duration});
                this.waitForAllPendingToCommit.set(true);
                return records;
            }
            if (records == null) {
                records = new ArrayList<SourceRecord>();
            }
            records.add(record);
            this.pendingMessages.put(record.internalMessageId, record);
            this.duration = 10L;
            if (this.numProduced.incrementAndGet() < (long)this.maxPending) continue;
            LOG.debug("{} Returning {} records after reaching max pending records {}", new Object[]{this, this.recordCount(records), this.maxPending});
            this.waitForAllPendingToCommit.set(true);
            return records;
        }
        LOG.debug("Returning {} records (full batch), with total of {} pending", (Object)this.recordCount(records), (Object)this.pendingMessages.size());
        return records;
    }

    private int recordCount(List<SourceRecord> records) {
        return records == null ? 0 : records.size();
    }

    private boolean exceededPollDuration(long exceedTimeMs) {
        return System.currentTimeMillis() > exceedTimeMs;
    }

    public void commitRecord(SourceRecord record) {
        LOG.trace("{} commitRecord()", (Object)this);
        this.pendingMessages.remove(((JmsSourceRecord)record).internalMessageId);
        if (this.waitForAllPendingToCommit.get() && this.pendingMessages.isEmpty()) {
            this.retryPolicy.call("Acknowledge Jms Message", () -> this.acknowledge(record));
            this.pendingCommitsLock.lock();
            try {
                this.numProduced.set(0L);
                this.waitForAllPendingToCommit.set(false);
                this.pendingCommits.signalAll();
            }
            finally {
                this.pendingCommitsLock.unlock();
            }
        }
    }

    public Message acknowledge(SourceRecord record) {
        try {
            if (this.clientHelper.isClosed()) {
                LOG.debug("Connection is closed. Connecting...");
                this.clientHelper.connect();
            }
            Message jmsMessage = ((JmsSourceRecord)record).message;
            this.clientHelper.acknowledge(jmsMessage);
            LOG.trace("{} Acknowledged message with id='{}'", (Object)this, (Object)jmsMessage.getJMSMessageID());
            return jmsMessage;
        }
        catch (JMSException e) {
            if (this.closeConnectionBeforeRetry(e)) {
                this.clientHelper.close();
            }
            if (this.isRetriable(e)) {
                throw new RetriableException((Throwable)e);
            }
            this.commitException.set(e);
            throw new ConnectException((Throwable)e);
        }
    }

    public void stop() {
        this.clientHelper.deactivate();
        LOG.trace("{} stop()", (Object)this);
        if (this.clientHelper != null) {
            LOG.info("Closing JMS connection");
            this.clientHelper.close();
        } else {
            LOG.info("{} JMS connection is null, skipping closing", (Object)this);
        }
    }

    public String connectorName() {
        return this.connectorName;
    }

    public int taskId() {
        return Integer.parseInt(this.taskId);
    }

    public String clientId() {
        return this.clientHelper.clientId();
    }

    public String toString() {
        return this.connectorNameAndTaskId;
    }
}

