package com.amazonaws.services.kinesis.connectors;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/connectors/KinesisClientLibraryPipelinedRecordProcessor.class */
public class KinesisClientLibraryPipelinedRecordProcessor implements IRecordProcessor {
    public static final long DEFAULT_MAXIMUM_QUEUE_WAIT_TIME_MS = 5000;
    public static final long DEFAULT_MAXIMUM_PROCESS_RECORDS_WAIT_TIME_MS = 60000;
    private static final Log LOG = LogFactory.getLog(KinesisClientLibraryPipelinedRecordProcessor.class);
    private final long maxQueueWaitTimeMs;
    private final long maxProcessRecordsWaitTimeMs;
    private final BlockingQueue<Record> recordQueue;
    private final IRecordProcessor recordProcessor;
    private final ExecutorService queueConsumerExecutor;
    private QueueConsumer queueConsumer;
    private String shardId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/amazonaws/services/kinesis/connectors/KinesisClientLibraryPipelinedRecordProcessor$QueueConsumer.class */
    public class QueueConsumer implements Runnable {
        volatile boolean shutdown;
        private volatile IRecordProcessorCheckpointer checkpointer;

        private QueueConsumer() {
            this.shutdown = false;
            this.checkpointer = null;
        }

        public void setCheckpointer(IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            this.checkpointer = KinesisClientLibraryPipelinedRecordProcessor.this.protectCheckpointer(iRecordProcessorCheckpointer);
        }

        @Override // java.lang.Runnable
        public void run() {
            KinesisClientLibraryPipelinedRecordProcessor.LOG.info("Starting queue consumer for shard: " + KinesisClientLibraryPipelinedRecordProcessor.this.shardId);
            while (!this.shutdown) {
                consumeQueue();
            }
            KinesisClientLibraryPipelinedRecordProcessor.LOG.info("Queue consumer terminated for shard: " + KinesisClientLibraryPipelinedRecordProcessor.this.shardId);
        }

        private void consumeQueue() {
            ArrayList arrayList = new ArrayList();
            Record record = null;
            try {
                record = (Record) KinesisClientLibraryPipelinedRecordProcessor.this.recordQueue.poll(KinesisClientLibraryPipelinedRecordProcessor.this.maxQueueWaitTimeMs, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                KinesisClientLibraryPipelinedRecordProcessor.LOG.error(e);
                Thread.currentThread().interrupt();
            }
            if (record == null) {
                KinesisClientLibraryPipelinedRecordProcessor.this.processRecords(arrayList, this.checkpointer);
                return;
            }
            arrayList.add(record);
            int drainTo = 0 + 1 + KinesisClientLibraryPipelinedRecordProcessor.this.recordQueue.drainTo(arrayList);
            KinesisClientLibraryPipelinedRecordProcessor.this.recordProcessor.processRecords(arrayList, this.checkpointer);
            KinesisClientLibraryPipelinedRecordProcessor.LOG.info("Consumed " + drainTo + " records");
        }
    }

    public KinesisClientLibraryPipelinedRecordProcessor(IRecordProcessor iRecordProcessor, int i) {
        this(iRecordProcessor, i, Long.valueOf(DEFAULT_MAXIMUM_QUEUE_WAIT_TIME_MS), 60000L);
    }

    public KinesisClientLibraryPipelinedRecordProcessor(IRecordProcessor iRecordProcessor, int i, Long l, Long l2) {
        this.queueConsumerExecutor = Executors.newSingleThreadExecutor();
        this.recordProcessor = iRecordProcessor;
        this.recordQueue = new LinkedBlockingQueue(i);
        this.maxQueueWaitTimeMs = l == null ? DEFAULT_MAXIMUM_QUEUE_WAIT_TIME_MS : l.longValue();
        this.maxProcessRecordsWaitTimeMs = l2 == null ? 60000L : l2.longValue();
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
    public void initialize(String str) {
        if (str == null) {
            throw new IllegalArgumentException("ShardId cannot be null");
        }
        this.shardId = str;
        this.recordProcessor.initialize(str);
        this.queueConsumer = new QueueConsumer();
        this.queueConsumerExecutor.submit(this.queueConsumer);
        this.queueConsumerExecutor.shutdown();
        LOG.info("Initialized pipelined record processor for shard: " + str);
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
    public void processRecords(List<Record> list, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
        this.queueConsumer.setCheckpointer(iRecordProcessorCheckpointer);
        Iterator<Record> it = list.iterator();
        while (it.hasNext()) {
            try {
                this.recordQueue.put(it.next());
            } catch (InterruptedException e) {
                LOG.error("Interrupted while adding a record to the queue", e);
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
    public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) {
        LOG.info("Shutting down pipelined processor for shard: " + this.shardId + " with reason:" + shutdownReason);
        this.queueConsumer.shutdown = true;
        try {
            if (this.queueConsumerExecutor.awaitTermination(this.maxProcessRecordsWaitTimeMs, TimeUnit.MILLISECONDS)) {
                ArrayList arrayList = new ArrayList();
                this.recordQueue.drainTo(arrayList);
                this.recordProcessor.processRecords(arrayList, iRecordProcessorCheckpointer);
                this.recordProcessor.shutdown(iRecordProcessorCheckpointer, shutdownReason);
            } else {
                LOG.warn("Queue consumer took longer than " + this.maxProcessRecordsWaitTimeMs + " ms to complete. Shutdown task failed.");
            }
        } catch (InterruptedException e) {
            LOG.error("Interrupted while draining queue", e);
            Thread.currentThread().interrupt();
        }
    }

    IRecordProcessorCheckpointer protectCheckpointer(final IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
        return new IRecordProcessorCheckpointer() { // from class: com.amazonaws.services.kinesis.connectors.KinesisClientLibraryPipelinedRecordProcessor.1
            private final IRecordProcessorCheckpointer internalCheckpointer;

            {
                this.internalCheckpointer = iRecordProcessorCheckpointer;
            }

            @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
            public void checkpoint(String str) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
                this.internalCheckpointer.checkpoint(str);
            }

            @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
            public void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
                throw new UnsupportedOperationException();
            }

            @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
            public void checkpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
                throw new UnsupportedOperationException();
            }

            @Override // com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
            public void checkpoint(String str, long j) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
                throw new UnsupportedOperationException();
            }
        };
    }
}
