/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aws.inbound.kinesis;

import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.aws.inbound.kinesis.ShardCheckpointer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.support.locks.RenewableLockRegistry;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@ManagedResource
@IntegrationManagedResource
public class KinesisMessageDrivenChannelAdapter
extends MessageProducerSupport
implements DisposableBean,
ApplicationEventPublisherAware {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private final KinesisAsyncClient amazonKinesis;
    private final String[] streams;
    private final Set<KinesisShardOffset> shardOffsets = new HashSet<KinesisShardOffset>();
    private final Map<KinesisShardOffset, ShardConsumer> shardConsumers = new ConcurrentHashMap<KinesisShardOffset, ShardConsumer>();
    private final Set<String> inResharding = new ConcurrentSkipListSet<String>();
    private final List<ConsumerInvoker> consumerInvokers = new ArrayList<ConsumerInvoker>();
    private final ShardConsumerManager shardConsumerManager = new ShardConsumerManager();
    private final ExecutorService shardLocksExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new CustomizableThreadFactory((this.getComponentName() == null ? "" : this.getComponentName()) + "-kinesis-shard-locks-"));
    private String consumerGroup = "SpringIntegration";
    private ConcurrentMetadataStore checkpointStore = new SimpleMetadataStore();
    private Executor dispatcherExecutor;
    private boolean dispatcherExecutorExplicitlySet;
    private Executor consumerExecutor;
    private boolean consumerExecutorExplicitlySet;
    private int maxConcurrency;
    private int concurrency;
    private KinesisShardOffset streamInitialSequence = KinesisShardOffset.latest();
    private Converter<byte[], Object> converter = new DeserializingConverter();
    private ListenerMode listenerMode = ListenerMode.record;
    private CheckpointMode checkpointMode = CheckpointMode.batch;
    private long checkpointsInterval = 5000L;
    private int recordsLimit = 10000;
    private int idleBetweenPolls = 1000;
    private int consumerBackoff = 1000;
    private int startTimeout = 60000;
    private int describeStreamBackoff = 1000;
    private int describeStreamRetries = 50;
    private long lockRenewalTimeout = 10000L;
    private boolean resetCheckpoints;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private LockRegistry lockRegistry;
    private boolean bindSourceRecord;
    private volatile boolean active;
    private volatile int consumerInvokerMaxCapacity;
    private volatile Future<?> shardConsumerManagerFuture;
    private ApplicationEventPublisher applicationEventPublisher;
    @Nullable
    private Function<List<Shard>, List<Shard>> shardListFilter;

    public KinesisMessageDrivenChannelAdapter(KinesisAsyncClient amazonKinesis, String ... streams) {
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null.");
        Assert.notEmpty((Object[])streams, (String)"'streams' must not be null.");
        this.amazonKinesis = amazonKinesis;
        this.streams = Arrays.copyOf(streams, streams.length);
    }

    public KinesisMessageDrivenChannelAdapter(KinesisAsyncClient amazonKinesis, KinesisShardOffset ... shardOffsets) {
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null.");
        Assert.notEmpty((Object[])shardOffsets, (String)"'shardOffsets' must not be null.");
        Assert.noNullElements((Object[])shardOffsets, (String)"'shardOffsets' must not contain null elements.");
        for (KinesisShardOffset shardOffset : shardOffsets) {
            Assert.isTrue((StringUtils.hasText((String)shardOffset.getStream()) && StringUtils.hasText((String)shardOffset.getShard()) ? 1 : 0) != 0, (String)"The 'shardOffsets' must be provided with particular 'stream' and 'shard' values.");
            this.shardOffsets.add(new KinesisShardOffset(shardOffset));
        }
        this.amazonKinesis = amazonKinesis;
        this.streams = null;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setConsumerGroup(String consumerGroup) {
        Assert.hasText((String)consumerGroup, (String)"'consumerGroup' must not be empty");
        this.consumerGroup = consumerGroup;
    }

    public void setCheckpointStore(ConcurrentMetadataStore checkpointStore) {
        Assert.notNull((Object)checkpointStore, (String)"'checkpointStore' must not be null");
        this.checkpointStore = checkpointStore;
    }

    public void setConsumerExecutor(Executor executor) {
        Assert.notNull((Object)executor, (String)"'executor' must not be null");
        this.consumerExecutor = executor;
        this.consumerExecutorExplicitlySet = true;
    }

    public void setDispatcherExecutor(Executor dispatcherExecutor) {
        this.dispatcherExecutor = dispatcherExecutor;
        this.dispatcherExecutorExplicitlySet = true;
    }

    public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence) {
        Assert.notNull((Object)streamInitialSequence, (String)"'streamInitialSequence' must not be null");
        this.streamInitialSequence = streamInitialSequence;
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull((Object)((Object)listenerMode), (String)"'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull((Object)((Object)checkpointMode), (String)"'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setCheckpointsInterval(long checkpointsInterval) {
        this.checkpointsInterval = checkpointsInterval;
    }

    public void setRecordsLimit(int recordsLimit) {
        Assert.isTrue((recordsLimit > 0 ? 1 : 0) != 0, (String)"'recordsLimit' must be more than 0");
        this.recordsLimit = Math.min(10000, recordsLimit);
    }

    public void setConsumerBackoff(int consumerBackoff) {
        this.consumerBackoff = Math.max(1000, consumerBackoff);
    }

    public void setDescribeStreamBackoff(int describeStreamBackoff) {
        this.describeStreamBackoff = Math.max(1000, describeStreamBackoff);
    }

    public void setDescribeStreamRetries(int describeStreamRetries) {
        Assert.isTrue((describeStreamRetries > 0 ? 1 : 0) != 0, (String)"'describeStreamRetries' must be more than 0");
        this.describeStreamRetries = describeStreamRetries;
    }

    public void setStartTimeout(int startTimeout) {
        Assert.isTrue((startTimeout > 0 ? 1 : 0) != 0, (String)"'startTimeout' must be more than 0");
        this.startTimeout = startTimeout;
    }

    public void setLockRenewalTimeout(long lockRenewalTimeout) {
        Assert.isTrue((lockRenewalTimeout > 0L ? 1 : 0) != 0, (String)"'lockRenewalTimeout' must be more than 0");
        this.lockRenewalTimeout = lockRenewalTimeout;
    }

    public void setConcurrency(int concurrency) {
        this.maxConcurrency = concurrency;
    }

    public void setIdleBetweenPolls(int idleBetweenPolls) {
        this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> embeddedHeadersMapper) {
        this.embeddedHeadersMapper = embeddedHeadersMapper;
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        this.lockRegistry = lockRegistry;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    public void setShardListFilter(Function<List<Shard>, List<Shard>> shardListFilter) {
        this.shardListFilter = shardListFilter;
    }

    protected void onInit() {
        super.onInit();
        String componentName = this.getComponentName();
        if (this.consumerExecutor == null) {
            this.consumerExecutor = Executors.newCachedThreadPool((ThreadFactory)new CustomizableThreadFactory((componentName == null ? "" : componentName) + "-kinesis-consumer-"));
        }
        if (this.dispatcherExecutor == null) {
            this.dispatcherExecutor = Executors.newCachedThreadPool((ThreadFactory)new CustomizableThreadFactory((componentName == null ? "" : componentName) + "-kinesis-dispatcher-"));
        }
        if (this.streams == null) {
            if (this.lockRegistry != null) {
                this.logger.warn((CharSequence)"The LockRegistry is ignored when explicit shards configuration is used.");
            }
            this.lockRegistry = null;
        }
    }

    public void destroy() {
        if (!this.consumerExecutorExplicitlySet) {
            ((ExecutorService)this.consumerExecutor).shutdown();
        }
        if (!this.dispatcherExecutorExplicitlySet) {
            ((ExecutorService)this.dispatcherExecutor).shutdown();
        }
    }

    @ManagedOperation
    public void stopConsumer(String stream, String shard) {
        ShardConsumer shardConsumer = this.shardConsumers.remove(KinesisShardOffset.latest(stream, shard));
        if (shardConsumer != null) {
            shardConsumer.stop();
        } else {
            this.logger.debug(() -> "There is no ShardConsumer for shard [" + shard + "] in stream [" + stream + "] to stop.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ManagedOperation
    public void startConsumer(String stream, String shard) {
        KinesisShardOffset shardOffsetForSearch = KinesisShardOffset.latest(stream, shard);
        ShardConsumer shardConsumer = this.shardConsumers.get(shardOffsetForSearch);
        if (shardConsumer != null) {
            this.logger.debug(() -> "The [" + shardConsumer + "] has been started before.");
        } else {
            Set<KinesisShardOffset> set = this.shardOffsets;
            synchronized (set) {
                for (KinesisShardOffset shardOffset : this.shardOffsets) {
                    if (!shardOffsetForSearch.equals(shardOffset)) continue;
                    this.shardConsumerManager.addShardToConsume(shardOffset);
                    break;
                }
            }
        }
    }

    @ManagedOperation
    public void resetCheckpointForShardToLatest(String stream, String shard) {
        this.restartShardConsumerForOffset(KinesisShardOffset.latest(stream, shard));
    }

    @ManagedOperation
    public void resetCheckpointForShardToTrimHorizon(String stream, String shard) {
        this.restartShardConsumerForOffset(KinesisShardOffset.trimHorizon(stream, shard));
    }

    @ManagedOperation
    public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber) {
        this.restartShardConsumerForOffset(KinesisShardOffset.atSequenceNumber(stream, shard, sequenceNumber));
    }

    @ManagedOperation
    public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp) {
        this.restartShardConsumerForOffset(KinesisShardOffset.atTimestamp(stream, shard, Instant.ofEpochSecond(timestamp)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartShardConsumerForOffset(KinesisShardOffset shardOffset) {
        Assert.isTrue((boolean)this.shardOffsets.contains(shardOffset), (String)("The [" + this + "] doesn't operate shard [" + shardOffset.getShard() + "] for stream [" + shardOffset.getStream() + "]"));
        this.logger.debug(() -> "Resetting consumer for [" + shardOffset + "]...");
        shardOffset.reset();
        Set<KinesisShardOffset> set = this.shardOffsets;
        synchronized (set) {
            this.shardOffsets.remove(shardOffset);
            this.shardOffsets.add(shardOffset);
        }
        if (this.active) {
            ShardConsumer oldShardConsumer = this.shardConsumers.remove(shardOffset);
            if (oldShardConsumer != null) {
                oldShardConsumer.close();
            }
            shardOffset.setReset(true);
            this.shardConsumerManager.addShardToConsume(shardOffset);
        }
    }

    @ManagedOperation
    public void resetCheckpoints() {
        this.resetCheckpoints = true;
        if (this.active) {
            this.stopConsumers();
            this.populateConsumers();
        }
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals((Object)this.listenerMode) && CheckpointMode.record.equals((Object)this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn((CharSequence)"The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        if (this.streams != null) {
            this.populateShardsForStreams();
        }
        this.populateConsumers();
        this.active = true;
        this.concurrency = Math.min(this.maxConcurrency, this.shardOffsets.size());
        this.dispatcherExecutor.execute((Runnable)((Object)new ConsumerDispatcher()));
        this.shardConsumerManagerFuture = this.shardLocksExecutor.submit((Runnable)((Object)this.shardConsumerManager));
    }

    private Collection<ShardConsumer> shardConsumerSubset(int i) {
        ArrayList<ShardConsumer> shardConsumers = new ArrayList<ShardConsumer>(this.shardConsumers.values());
        if (this.concurrency == 1) {
            return shardConsumers;
        }
        int numConsumers = shardConsumers.size();
        if (numConsumers == this.concurrency) {
            return Collections.singleton((ShardConsumer)shardConsumers.get(i));
        }
        int perInvoker = numConsumers / this.concurrency;
        List<ShardConsumer> subset = i == this.concurrency - 1 ? shardConsumers.subList(i * perInvoker, numConsumers) : shardConsumers.subList(i * perInvoker, (i + 1) * perInvoker);
        return subset;
    }

    private List<Shard> readShardList(String stream) {
        return this.readShardList(stream, 0);
    }

    private List<Shard> readShardList(String stream, int retryCount) {
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        if (retryCount > this.describeStreamRetries) {
            throw new IllegalStateException("Kinesis could not read shards from stream with name [" + stream + "] ");
        }
        String nextToken = null;
        ListShardsRequest.Builder listShardsRequest = ListShardsRequest.builder().streamName(stream);
        try {
            ListShardsResponse listShardsResult;
            do {
                listShardsResult = (ListShardsResponse)this.amazonKinesis.listShards((ListShardsRequest)listShardsRequest.nextToken(nextToken).build()).join();
                shardList.addAll(listShardsResult.shards());
            } while ((nextToken = listShardsResult.nextToken()) != null);
        }
        catch (CompletionException ex) {
            if (ex.getCause() instanceof LimitExceededException) {
                this.logger.info(() -> "Got LimitExceededException when listing stream [" + stream + "]. Backing off for [" + this.describeStreamBackoff + "] millis.");
                try {
                    Thread.sleep(this.describeStreamBackoff);
                    this.readShardList(stream, retryCount + 1);
                }
                catch (InterruptedException interrupt) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("The [describeStream] thread for the stream [" + stream + "] has been interrupted.", interrupt);
                }
            }
            throw ex;
        }
        return shardList;
    }

    private void populateShardsForStreams() {
        this.shardOffsets.clear();
        CountDownLatch shardsGatherLatch = new CountDownLatch(this.streams.length);
        for (String stream : this.streams) {
            this.populateShardsForStream(stream, shardsGatherLatch);
        }
        try {
            if (!shardsGatherLatch.await(this.startTimeout, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("The [ " + this + "] could not start during timeout: " + this.startTimeout);
            }
        }
        catch (InterruptedException e) {
            throw new IllegalStateException("The [ " + this + "] has been interrupted from start.");
        }
    }

    private List<Shard> detectShardsToConsume(String stream) {
        return this.detectShardsToConsume(stream, 0);
    }

    private List<Shard> detectShardsToConsume(String stream, int retry) {
        ArrayList<Shard> shardsToConsume = new ArrayList<Shard>();
        List<Shard> shards = this.readShardList(stream);
        try {
            for (Shard shard : shards) {
                String key = this.buildCheckpointKeyForShard(stream, shard.shardId());
                String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
                if (endingSequenceNumber != null) {
                    String checkpoint = this.checkpointStore.get(key);
                    boolean skipClosedAndExhaustedShard = checkpoint != null && new BigInteger(endingSequenceNumber).compareTo(new BigInteger(checkpoint)) <= 0;
                    this.logger.trace(() -> "The shard [" + shard + "] in stream [" + stream + "] is closed CLOSED and exhausted with endingSequenceNumber [" + endingSequenceNumber + "].\nThe last processed checkpoint is [" + checkpoint + "]." + (skipClosedAndExhaustedShard ? "\nThe shard will be skipped." : ""));
                    if (skipClosedAndExhaustedShard) continue;
                }
                shardsToConsume.add(shard);
            }
        }
        catch (Exception ex) {
            String exceptionMessage = "Got an exception when processing shards in stream [" + stream + "]";
            this.logger.info((Throwable)ex, () -> exceptionMessage + ".\n Retrying... ");
            if (retry > 5) {
                throw new IllegalStateException(exceptionMessage, ex);
            }
            this.detectShardsToConsume(stream, retry + 1);
            this.sleep(this.describeStreamBackoff, new IllegalStateException(exceptionMessage), false);
        }
        return this.shardListFilter != null ? this.shardListFilter.apply(shardsToConsume) : shardsToConsume;
    }

    private void sleep(long sleepAmount, RuntimeException error, boolean interruptThread) {
        try {
            Thread.sleep(sleepAmount);
        }
        catch (Exception ex) {
            if (interruptThread) {
                Thread.currentThread().interrupt();
            }
            if (this.active) {
                this.logger.error((Throwable)ex, (CharSequence)error.getMessage());
            } else {
                this.logger.info((Throwable)ex, () -> error.getMessage() + " while adapter was inactive");
            }
            throw error;
        }
    }

    private void populateShardsForStream(String stream, CountDownLatch shardsGatherLatch) {
        this.dispatcherExecutor.execute(() -> {
            try {
                List<Shard> shardsToConsume = this.detectShardsToConsume(stream);
                for (Shard shard : shardsToConsume) {
                    boolean addedOffset;
                    KinesisShardOffset shardOffset = new KinesisShardOffset(this.streamInitialSequence);
                    shardOffset.setShard(shard.shardId());
                    shardOffset.setStream(stream);
                    Set<KinesisShardOffset> set = this.shardOffsets;
                    synchronized (set) {
                        addedOffset = this.shardOffsets.add(shardOffset);
                    }
                    if (!addedOffset || shardsGatherLatch != null || !this.active) continue;
                    this.shardConsumerManager.addShardToConsume(shardOffset);
                }
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, () -> "Error population shards for stream: " + stream);
            }
            finally {
                if (shardsGatherLatch != null) {
                    shardsGatherLatch.countDown();
                }
                this.inResharding.remove(stream);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void populateConsumers() {
        Set<KinesisShardOffset> set = this.shardOffsets;
        synchronized (set) {
            for (KinesisShardOffset shardOffset : this.shardOffsets) {
                this.shardConsumerManager.addShardToConsume(shardOffset);
            }
        }
        this.resetCheckpoints = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void populateConsumer(KinesisShardOffset shardOffset) {
        shardOffset.setReset(this.resetCheckpoints);
        ShardConsumer shardConsumer = new ShardConsumer(shardOffset);
        if (this.active) {
            List<ConsumerInvoker> list = this.consumerInvokers;
            synchronized (list) {
                if (this.consumerInvokers.size() < this.maxConcurrency) {
                    ConsumerInvoker consumerInvoker = new ConsumerInvoker(Collections.singleton(shardConsumer));
                    this.consumerInvokers.add(consumerInvoker);
                    this.consumerExecutor.execute((Runnable)((Object)consumerInvoker));
                } else {
                    boolean consumerAdded = false;
                    for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
                        if (consumerInvoker.consumers.size() >= this.consumerInvokerMaxCapacity) continue;
                        consumerInvoker.addConsumer(shardConsumer);
                        consumerAdded = true;
                        break;
                    }
                    if (this.concurrency != 0 && !consumerAdded) {
                        ConsumerInvoker firstConsumerInvoker = this.consumerInvokers.get(0);
                        firstConsumerInvoker.addConsumer(shardConsumer);
                        this.consumerInvokerMaxCapacity = firstConsumerInvoker.consumers.size();
                    }
                }
            }
        }
        this.shardConsumers.put(shardOffset, shardConsumer);
    }

    private String buildCheckpointKeyForShard(String stream, String shardId) {
        return this.consumerGroup + ":" + stream + ":" + shardId;
    }

    protected void doStop() {
        for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
            consumerInvoker.notifyBarrier();
        }
        super.doStop();
        this.stopConsumers();
        this.active = false;
        this.shardConsumerManagerFuture.cancel(true);
    }

    private void stopConsumers() {
        for (ShardConsumer shardConsumer : this.shardConsumers.values()) {
            shardConsumer.stop();
        }
        this.shardConsumers.clear();
    }

    private void setAttributesIfNecessary(Object record, Message<?> message) {
        if (this.getErrorChannel() != null) {
            AttributeAccessor attributes = ErrorMessageUtils.getAttributeAccessor(message, null);
            attributesHolder.set(attributes);
            attributes.setAttribute("aws_rawRecord", record);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    public String toString() {
        return "KinesisMessageDrivenChannelAdapter{shardOffsets=" + this.shardOffsets + ", consumerGroup='" + this.consumerGroup + "'}";
    }

    private final class ShardConsumerManager
    implements SchedulingAwareRunnable {
        private final Map<String, KinesisShardOffset> shardOffsetsToConsumer = new ConcurrentHashMap<String, KinesisShardOffset>();
        private final Map<String, Lock> locks = new HashMap<String, Lock>();
        private final Queue<LockCompletableFuture> forUnlocking = new ConcurrentLinkedQueue<LockCompletableFuture>();
        private final Queue<LockCompletableFuture> forRenewing = new ConcurrentLinkedQueue<LockCompletableFuture>();

        ShardConsumerManager() {
        }

        void addShardToConsume(KinesisShardOffset kinesisShardOffset) {
            String lockKey = KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard());
            this.shardOffsetsToConsumer.put(lockKey, kinesisShardOffset);
        }

        void unlock(LockCompletableFuture unlockFuture) {
            this.forUnlocking.add(unlockFuture);
        }

        void renewLock(LockCompletableFuture renewLockFuture) {
            this.forRenewing.add(renewLockFuture);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            Lock lock;
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    LockCompletableFuture lockFuture;
                    LockCompletableFuture forUnlocking;
                    this.shardOffsetsToConsumer.entrySet().removeIf(entry -> {
                        boolean remove = true;
                        if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                            String key = (String)entry.getKey();
                            Lock lock = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain((Object)key);
                            try {
                                if (lock.tryLock()) {
                                    this.locks.put(key, lock);
                                } else {
                                    remove = false;
                                }
                            }
                            catch (Exception ex) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, (CharSequence)("Error during locking: " + lock));
                            }
                        }
                        if (remove) {
                            KinesisMessageDrivenChannelAdapter.this.populateConsumer((KinesisShardOffset)entry.getValue());
                        }
                        return remove;
                    });
                    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && (forUnlocking = this.forUnlocking.poll()) != null) {
                        lock = this.locks.remove(forUnlocking.lockKey);
                        if (lock != null) {
                            try {
                                lock.unlock();
                            }
                            catch (Exception ex) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, (CharSequence)("Error during unlocking: " + lock));
                            }
                        }
                        forUnlocking.complete(true);
                    }
                    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && (lockFuture = this.forRenewing.poll()) != null) {
                        lock = this.locks.get(lockFuture.lockKey);
                        if (lock != null) {
                            try {
                                if (this.renewLockInRegistry(lockFuture)) continue;
                                lockFuture.complete(false);
                                this.locks.remove(lockFuture.lockKey);
                            }
                            catch (Exception ex) {
                                lockFuture.complete(false);
                                KinesisMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, () -> "Error during locking: " + lock);
                            }
                            continue;
                        }
                        lockFuture.complete(false);
                    }
                    KinesisMessageDrivenChannelAdapter.this.sleep(1000L, new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"), true);
                }
            }
            finally {
                Iterator<Lock> iterator = this.locks.values().iterator();
                while (iterator.hasNext()) {
                    lock = iterator.next();
                    try {
                        lock.unlock();
                    }
                    catch (Exception ex) {
                        if (KinesisMessageDrivenChannelAdapter.this.active) {
                            KinesisMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, () -> "Error during unlocking: " + lock);
                            continue;
                        }
                        KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, () -> "Error during unlocking: " + lock + " while adapter was inactive");
                    }
                    finally {
                        iterator.remove();
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean renewLockInRegistry(LockCompletableFuture renewLockFuture) {
            LockRegistry lockRegistry = KinesisMessageDrivenChannelAdapter.this.lockRegistry;
            if (lockRegistry instanceof RenewableLockRegistry) {
                RenewableLockRegistry renewableLockRegistry = (RenewableLockRegistry)lockRegistry;
                try {
                    renewableLockRegistry.renewLock((Object)renewLockFuture.lockKey);
                    return renewLockFuture.complete(true);
                }
                catch (IllegalStateException ex) {
                    return false;
                }
            }
            Lock lock = this.locks.get(renewLockFuture.lockKey);
            if (lock.tryLock()) {
                try {
                    boolean bl = renewLockFuture.complete(true);
                    return bl;
                }
                finally {
                    lock.unlock();
                }
            }
            return false;
        }

        public boolean isLongLived() {
            return true;
        }
    }

    private final class ShardConsumer {
        private final KinesisShardOffset shardOffset;
        private final ShardCheckpointer checkpointer;
        private final String key;
        private long nextCheckpointTimeInMillis;
        private Runnable notifier;
        private volatile ConsumerState state = ConsumerState.NEW;
        private volatile Runnable task;
        private volatile String shardIterator;
        private volatile long sleepUntil;
        private final Runnable processTask = this.processTask();

        ShardConsumer(KinesisShardOffset shardOffset) {
            this.shardOffset = new KinesisShardOffset(shardOffset);
            this.key = KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(shardOffset.getStream(), shardOffset.getShard());
            this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key);
        }

        void setNotifier(Runnable notifier) {
            this.notifier = notifier;
        }

        void stop() {
            this.state = ConsumerState.STOP;
            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                LockCompletableFuture unlockFuture = new LockCompletableFuture(this.key);
                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(unlockFuture);
                try {
                    unlockFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout, TimeUnit.MILLISECONDS);
                }
                catch (Exception ex) {
                    if (ex instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, () -> "The lock for key '" + this.key + "' was not unlocked in time");
                }
            }
            if (this.notifier != null) {
                this.notifier.run();
            }
        }

        void close() {
            this.stop();
            this.checkpointer.close();
        }

        void execute() {
            if (this.task == null) {
                if (!this.renewLockIfAny()) {
                    return;
                }
                switch (this.state) {
                    case NEW: 
                    case EXPIRED: {
                        this.task = () -> {
                            try {
                                if (this.shardOffset.isReset()) {
                                    this.checkpointer.remove();
                                } else {
                                    String checkpoint = this.checkpointer.getCheckpoint();
                                    if (checkpoint != null) {
                                        this.shardOffset.setSequenceNumber(checkpoint);
                                        this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                                    }
                                }
                                if (this.state == ConsumerState.NEW) {
                                    KinesisMessageDrivenChannelAdapter.this.logger.info(() -> "The [" + this + "] has been started.");
                                }
                                GetShardIteratorRequest shardIteratorRequest = this.shardOffset.toShardIteratorRequest();
                                try {
                                    this.shardIterator = (String)((CompletableFuture)KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(shardIteratorRequest).thenApply(GetShardIteratorResponse::shardIterator)).join();
                                }
                                catch (CompletionException ex) {
                                    InvalidArgumentException cause;
                                    Throwable patt32684$temp = ex.getCause();
                                    if (patt32684$temp instanceof InvalidArgumentException && (cause = (InvalidArgumentException)patt32684$temp).getMessage().contains("has reached max possible value for the shard")) {
                                        KinesisMessageDrivenChannelAdapter.this.logger.info(() -> "The [" + this.shardOffset + "] has been closed. Skipping...");
                                    }
                                    throw ex;
                                }
                                if (this.shardIterator == null) {
                                    this.state = ConsumerState.STOP;
                                }
                                if (ConsumerState.STOP != this.state) {
                                    this.state = ConsumerState.CONSUME;
                                }
                            }
                            finally {
                                this.task = null;
                            }
                        };
                        break;
                    }
                    case CONSUME: {
                        this.task = this.processTask;
                        break;
                    }
                    case SLEEP: {
                        if (System.currentTimeMillis() >= this.sleepUntil) {
                            this.state = ConsumerState.CONSUME;
                        }
                        this.task = null;
                        break;
                    }
                    case STOP: {
                        if (this.shardIterator == null) {
                            KinesisMessageDrivenChannelAdapter.this.logger.info(() -> "Stopping the [" + this + "] on the checkpoint [" + this.checkpointer.getCheckpoint() + "] because the shard has been CLOSED and exhausted.");
                        } else {
                            KinesisMessageDrivenChannelAdapter.this.logger.info(() -> "Stopping the [" + this + "].");
                        }
                        this.task = null;
                    }
                }
                if (this.task != null) {
                    if (this.notifier != null) {
                        this.notifier.run();
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.concurrency == 0) {
                        KinesisMessageDrivenChannelAdapter.this.consumerExecutor.execute(this.task);
                    }
                }
            }
        }

        private boolean renewLockIfAny() {
            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && this.state == ConsumerState.CONSUME) {
                LockCompletableFuture renewLockFuture = new LockCompletableFuture(this.key);
                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.renewLock(renewLockFuture);
                boolean lockRenewed = false;
                try {
                    lockRenewed = (Boolean)renewLockFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout, TimeUnit.MILLISECONDS);
                }
                catch (Exception ex) {
                    if (ex instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, () -> "The lock for key '" + this.key + "' was not renewed in time");
                }
                if (!lockRenewed && this.state == ConsumerState.CONSUME) {
                    this.state = ConsumerState.STOP;
                    this.checkpointer.close();
                    if (this.notifier != null) {
                        this.notifier.run();
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.active) {
                        KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.addShardToConsume(this.shardOffset);
                    }
                    return false;
                }
            }
            return true;
        }

        private Runnable processTask() {
            return () -> {
                block19: {
                    block16: {
                        GetRecordsResponse result;
                        block17: {
                            block18: {
                                GetRecordsRequest getRecordsRequest = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(this.shardIterator).limit(Integer.valueOf(KinesisMessageDrivenChannelAdapter.this.recordsLimit)).build();
                                result = null;
                                try {
                                    result = this.getRecords(getRecordsRequest);
                                    if (result != null) {
                                        List records = result.records();
                                        if (!records.isEmpty()) {
                                            this.processRecords(records);
                                        }
                                        this.shardIterator = result.nextShardIterator();
                                    }
                                    attributesHolder.remove();
                                    if (result == null) break block16;
                                    if (this.shardIterator != null) break block17;
                                    if (KinesisMessageDrivenChannelAdapter.this.lockRegistry == null) break block18;
                                    KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.shardOffsetsToConsumer.remove(this.key);
                                }
                                catch (Exception ex) {
                                    this.rewindIteratorOnError(ex, result);
                                    break block19;
                                }
                                finally {
                                    attributesHolder.remove();
                                    if (result != null) {
                                        if (this.shardIterator == null) {
                                            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                                                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.shardOffsetsToConsumer.remove(this.key);
                                            }
                                            for (Shard shard : KinesisMessageDrivenChannelAdapter.this.readShardList(this.shardOffset.getStream())) {
                                                if (!shard.shardId().equals(this.shardOffset.getShard())) continue;
                                                String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
                                                if (endingSequenceNumber == null) break;
                                                this.checkpointSwallowingProvisioningExceptions(endingSequenceNumber);
                                                break;
                                            }
                                            if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
                                                KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent((ApplicationEvent)new KinesisShardEndedEvent((Object)KinesisMessageDrivenChannelAdapter.this, this.key));
                                            }
                                            this.stop();
                                        }
                                        if (ConsumerState.STOP != this.state && result.records().isEmpty()) {
                                            KinesisMessageDrivenChannelAdapter.this.logger.debug(() -> "No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds.");
                                            this.prepareSleepState();
                                        }
                                    }
                                    this.task = null;
                                }
                            }
                            for (Shard shard : KinesisMessageDrivenChannelAdapter.this.readShardList(this.shardOffset.getStream())) {
                                if (!shard.shardId().equals(this.shardOffset.getShard())) continue;
                                String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
                                if (endingSequenceNumber == null) break;
                                this.checkpointSwallowingProvisioningExceptions(endingSequenceNumber);
                                break;
                            }
                            if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
                                KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent((ApplicationEvent)new KinesisShardEndedEvent((Object)KinesisMessageDrivenChannelAdapter.this, this.key));
                            }
                            this.stop();
                        }
                        if (ConsumerState.STOP != this.state && result.records().isEmpty()) {
                            KinesisMessageDrivenChannelAdapter.this.logger.debug(() -> "No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds.");
                            this.prepareSleepState();
                        }
                    }
                    this.task = null;
                }
            };
        }

        private void rewindIteratorOnError(Exception ex, GetRecordsResponse result) {
            String lastCheckpoint = this.checkpointer.getLastCheckpointValue();
            String highestSequence = this.checkpointer.getHighestSequence();
            if (highestSequence == null) {
                KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, (CharSequence)"getRecords request has thrown exception. No checkpoints - re-request with the current shard iterator.");
            } else if (highestSequence.equals(lastCheckpoint)) {
                KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, (CharSequence)"Record processor has thrown exception. Ignore since the highest sequence in batch was check-pointed.");
                this.shardIterator = result.nextShardIterator();
            } else if (this.reRequestCurrentShardIterator(lastCheckpoint, result)) {
                KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, (CharSequence)"Record processor has thrown exception. No checkpoints - re-request with the current shard iterator.");
            } else {
                KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
                newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                KinesisMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, () -> "Record processor has thrown exception. Rewind shard iterator after sequence number: " + lastCheckpoint);
                newOffset.setSequenceNumber(lastCheckpoint);
                GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
                this.shardIterator = ((GetShardIteratorResponse)KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(shardIteratorRequest).join()).shardIterator();
            }
        }

        private boolean reRequestCurrentShardIterator(@Nullable String lastCheckpoint, GetRecordsResponse result) {
            if (lastCheckpoint == null) {
                return true;
            }
            List records = result.records();
            return !records.isEmpty() && new BigInteger(lastCheckpoint).compareTo(new BigInteger(((Record)records.get(records.size() - 1)).sequenceNumber())) < 0;
        }

        private void checkpointSwallowingProvisioningExceptions(String endingSequenceNumber) {
            try {
                this.checkpointer.checkpoint(endingSequenceNumber);
            }
            catch (ProvisionedThroughputExceededException ignored) {
                KinesisMessageDrivenChannelAdapter.this.logger.debug((Throwable)ignored, (CharSequence)"Exception while checkpointing empty shards");
            }
        }

        private GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) {
            try {
                return (GetRecordsResponse)KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest).join();
            }
            catch (CompletionException ex) {
                Throwable cause = ex.getCause();
                if (cause instanceof ExpiredIteratorException) {
                    KinesisMessageDrivenChannelAdapter.this.logger.info(() -> "Shard iterator for [" + this + "] expired.\nA new one will be started from the check pointed sequence number.");
                    this.state = ConsumerState.EXPIRED;
                } else if (cause instanceof ProvisionedThroughputExceededException) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn(() -> "GetRecords request throttled for [" + this + "] with the reason: " + cause.getMessage());
                    this.prepareSleepState();
                } else {
                    throw ex;
                }
                return null;
            }
        }

        private void prepareSleepState() {
            this.sleepUntil = System.currentTimeMillis() + (long)KinesisMessageDrivenChannelAdapter.this.consumerBackoff;
            this.state = ConsumerState.SLEEP;
        }

        private void processRecords(List<Record> records) {
            KinesisMessageDrivenChannelAdapter.this.logger.trace(() -> "Processing records: " + records + " for [" + this + "]");
            this.checkpointer.setHighestSequence(records.get(records.size() - 1).sequenceNumber());
            if (ListenerMode.record.equals((Object)KinesisMessageDrivenChannelAdapter.this.listenerMode)) {
                for (Record record : records) {
                    this.processSingleRecord(record);
                    this.checkpointIfRecordMode(record);
                    this.checkpointIfPeriodicMode(record);
                }
            } else if (ListenerMode.batch.equals((Object)KinesisMessageDrivenChannelAdapter.this.listenerMode)) {
                this.processMultipleRecords(records);
                this.checkpointIfPeriodicMode(null);
            }
            this.checkpointIfBatchMode();
        }

        private void processSingleRecord(Record record) {
            this.performSend(this.prepareMessageForRecord(record), record);
        }

        private void processMultipleRecords(List<Record> records) {
            AbstractIntegrationMessageBuilder messageBuilder = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(records);
            if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                List payload = records.stream().map(this::prepareMessageForRecord).map(AbstractIntegrationMessageBuilder::build).collect(Collectors.toList());
                messageBuilder = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload);
            } else if (KinesisMessageDrivenChannelAdapter.this.converter != null) {
                ArrayList partitionKeys = new ArrayList();
                ArrayList sequenceNumbers = new ArrayList();
                List payload = records.stream().map(r -> {
                    partitionKeys.add(r.partitionKey());
                    sequenceNumbers.add(r.sequenceNumber());
                    return KinesisMessageDrivenChannelAdapter.this.converter.convert((Object)r.data().asByteArray());
                }).collect(Collectors.toList());
                messageBuilder = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", partitionKeys).setHeader("aws_receivedSequenceNumber", sequenceNumbers);
            }
            this.performSend(messageBuilder, records);
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record) {
            Object payload = record.data().asByteArray();
            Message messageToUse = null;
            if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    messageToUse = KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage(payload);
                    payload = messageToUse.getPayload();
                }
                catch (Exception ex) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn((Throwable)ex, (CharSequence)"Could not parse embedded headers. Remain payload untouched.");
                }
            }
            if (payload instanceof byte[] && KinesisMessageDrivenChannelAdapter.this.converter != null) {
                payload = KinesisMessageDrivenChannelAdapter.this.converter.convert(payload);
            }
            AbstractIntegrationMessageBuilder messageBuilder = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", (Object)record.partitionKey()).setHeader("aws_receivedSequenceNumber", (Object)record.sequenceNumber());
            if (KinesisMessageDrivenChannelAdapter.this.bindSourceRecord) {
                messageBuilder.setHeader("sourceData", (Object)record);
            }
            if (messageToUse != null) {
                messageBuilder.copyHeadersIfAbsent((Map)messageToUse.getHeaders());
            }
            return messageBuilder;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord) {
            messageBuilder.setHeader("aws_receivedStream", (Object)this.shardOffset.getStream()).setHeader("aws_shard", (Object)this.shardOffset.getShard());
            if (CheckpointMode.manual.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                messageBuilder.setHeader("aws_checkpointer", (Object)this.checkpointer);
            }
            Message messageToSend = messageBuilder.build();
            KinesisMessageDrivenChannelAdapter.this.setAttributesIfNecessary(rawRecord, messageToSend);
            KinesisMessageDrivenChannelAdapter.this.sendMessage(messageToSend);
        }

        private void checkpointIfBatchMode() {
            if (CheckpointMode.batch.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpointer.checkpoint();
            }
        }

        private void checkpointIfRecordMode(Record record) {
            if (CheckpointMode.record.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpointer.checkpoint(record.sequenceNumber());
            }
        }

        private void checkpointIfPeriodicMode(@Nullable Record record) {
            if (CheckpointMode.periodic.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode) && System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
                if (record == null) {
                    this.checkpointer.checkpoint();
                } else {
                    this.checkpointer.checkpoint(record.sequenceNumber());
                }
                this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval;
            }
        }

        public String toString() {
            return "ShardConsumer{shardOffset=" + this.shardOffset + ", state=" + this.state + "}";
        }
    }

    private final class ConsumerDispatcher
    implements SchedulingAwareRunnable {
        private final Set<String> inReshardingProcess = new HashSet<String>();

        private ConsumerDispatcher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                for (String stream : KinesisMessageDrivenChannelAdapter.this.inResharding) {
                    if (!this.inReshardingProcess.add(stream)) continue;
                    KinesisMessageDrivenChannelAdapter.this.logger.debug(() -> "Resharding has happened for stream [" + stream + "]. Rebalancing...");
                    KinesisMessageDrivenChannelAdapter.this.populateShardsForStream(stream, null);
                }
                Iterator<ShardConsumer> iterator = KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator();
                while (iterator.hasNext()) {
                    KinesisShardOffset shardOffset;
                    String stream;
                    ShardConsumer shardConsumer = iterator.next();
                    shardConsumer.execute();
                    if (ConsumerState.STOP != shardConsumer.state) continue;
                    iterator.remove();
                    if (KinesisMessageDrivenChannelAdapter.this.streams == null || shardConsumer.shardIterator != null || !KinesisMessageDrivenChannelAdapter.this.inResharding.add(stream = (shardOffset = shardConsumer.shardOffset).getStream())) continue;
                    this.inReshardingProcess.remove(stream);
                    Set<KinesisShardOffset> set = KinesisMessageDrivenChannelAdapter.this.shardOffsets;
                    synchronized (set) {
                        KinesisMessageDrivenChannelAdapter.this.shardOffsets.remove(shardOffset);
                    }
                }
                String errorMsg = "ConsumerDispatcher Thread [" + this + "] has been interrupted";
                KinesisMessageDrivenChannelAdapter.this.sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls, new IllegalStateException(errorMsg), true);
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    private final class ConsumerInvoker
    implements SchedulingAwareRunnable {
        private final Queue<ShardConsumer> consumers = new ConcurrentLinkedQueue<ShardConsumer>();
        private final Semaphore processBarrier = new Semaphore(0);
        private final Runnable notifier = this::notifyBarrier;

        ConsumerInvoker(Collection<ShardConsumer> shardConsumers) {
            for (ShardConsumer shardConsumer : shardConsumers) {
                this.addConsumer(shardConsumer);
            }
        }

        void addConsumer(ShardConsumer shardConsumer) {
            shardConsumer.setNotifier(this.notifier);
            this.consumers.add(shardConsumer);
        }

        void notifyBarrier() {
            this.processBarrier.release();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            List<ConsumerInvoker> list;
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                try {
                    this.processBarrier.acquire();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("ConsumerInvoker thread [" + this + "] has been interrupted", e);
                }
                Iterator iterator = this.consumers.iterator();
                while (iterator.hasNext()) {
                    ShardConsumer shardConsumer = (ShardConsumer)iterator.next();
                    if (ConsumerState.STOP == shardConsumer.state) {
                        iterator.remove();
                        continue;
                    }
                    if (shardConsumer.task == null) continue;
                    try {
                        shardConsumer.task.run();
                    }
                    catch (Exception ex) {
                        KinesisMessageDrivenChannelAdapter.this.logger.info(() -> "Got an exception " + ex + " during [" + shardConsumer + "] task invocation.\nProcess will be retried on the next iteration.");
                    }
                }
                list = KinesisMessageDrivenChannelAdapter.this.consumerInvokers;
                synchronized (list) {
                    if (this.consumers.isEmpty()) {
                        KinesisMessageDrivenChannelAdapter.this.consumerInvokers.remove(this);
                        break;
                    }
                }
            }
            list = KinesisMessageDrivenChannelAdapter.this.consumerInvokers;
            synchronized (list) {
                KinesisMessageDrivenChannelAdapter.this.consumerInvokers.remove(this);
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    private static final class LockCompletableFuture
    extends CompletableFuture<Boolean> {
        private final String lockKey;

        LockCompletableFuture(String lockKey) {
            this.lockKey = lockKey;
        }
    }

    private static enum ConsumerState {
        NEW,
        EXPIRED,
        CONSUME,
        SLEEP,
        STOP;

    }
}

