/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.diskstorage.log.kcvs;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.ReadBuffer;
import org.janusgraph.diskstorage.ResourceUnavailableException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.configuration.ConfigOption;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
import org.janusgraph.diskstorage.keycolumnvalue.KCVSUtil;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.log.Log;
import org.janusgraph.diskstorage.log.Message;
import org.janusgraph.diskstorage.log.MessageReader;
import org.janusgraph.diskstorage.log.ReadMarker;
import org.janusgraph.diskstorage.log.kcvs.ExternalPersistor;
import org.janusgraph.diskstorage.log.kcvs.KCVSLogManager;
import org.janusgraph.diskstorage.log.kcvs.KCVSMessage;
import org.janusgraph.diskstorage.log.util.FutureMessage;
import org.janusgraph.diskstorage.log.util.ProcessMessageJob;
import org.janusgraph.diskstorage.util.BackendOperation;
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.diskstorage.util.StandardBaseTransactionConfig;
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.janusgraph.diskstorage.util.WriteByteBuffer;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.janusgraph.util.system.BackgroundThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PreInitializeConfigOptions
public class KCVSLog
implements Log,
BackendOperation.TransactionalProvider {
    private static final Logger log = LoggerFactory.getLogger(KCVSLog.class);
    public static final ConfigOption<Duration> LOG_MAX_WRITE_TIME = new ConfigOption<Duration>(GraphDatabaseConfiguration.LOG_NS, "max-write-time", "Maximum time in ms to try persisting log messages against the backend before failing.", ConfigOption.Type.MASKABLE, Duration.ofMillis(10000L));
    public static final ConfigOption<Duration> LOG_MAX_READ_TIME = new ConfigOption<Duration>(GraphDatabaseConfiguration.LOG_NS, "max-read-time", "Maximum time in ms to try reading log messages from the backend before failing.", ConfigOption.Type.MASKABLE, Duration.ofMillis(4000L));
    public static final ConfigOption<Duration> LOG_READ_LAG_TIME = new ConfigOption<Duration>(GraphDatabaseConfiguration.LOG_NS, "read-lag-time", "Maximum time in ms that it may take for reads to appear in the backend. If a write does not becomevisible in the storage backend in this amount of time, a log reader might miss the message.", ConfigOption.Type.MASKABLE, Duration.ofMillis(500L));
    public static final ConfigOption<Boolean> LOG_KEY_CONSISTENT = new ConfigOption<Boolean>(GraphDatabaseConfiguration.LOG_NS, "key-consistent", "Whether to require consistency for log reading and writing messages to the storage backend", ConfigOption.Type.MASKABLE, false);
    public static final long TIMESLICE_INTERVAL = 100000000L;
    private static final Duration MIN_DELIVERY_DELAY = Duration.ofMillis(10L);
    private static final int BATCH_SIZE_MULTIPLIER = 10;
    private static final Duration CLOSE_DOWN_WAIT = Duration.ofSeconds(10L);
    private static final Duration INITIAL_READER_DELAY = Duration.ofMillis(100L);
    private static final Duration FOREVER = Duration.ofNanos(Long.MAX_VALUE);
    private static final int SYSTEM_PARTITION_ID = -1;
    private static final byte MESSAGE_COUNTER = 1;
    private static final byte MARKER_PREFIX = 2;
    private static final StaticBuffer MESSAGE_COUNTER_COLUMN = new WriteByteBuffer(1).putByte((byte)1).getStaticBuffer();
    private static final Random random = new Random();
    private static final Duration TWO_MICROSECONDS = Duration.of(2L, ChronoUnit.MICROS);
    private final KCVSLogManager manager;
    private final String name;
    private final KeyColumnValueStore store;
    private ReadMarker readMarker;
    private final int numBuckets;
    private final boolean keyConsistentOperations;
    private final int sendBatchSize;
    private final Duration maxSendDelay;
    private final Duration maxWriteTime;
    private final ArrayBlockingQueue<MessageEnvelope> outgoingMsg;
    private final SendThread sendThread;
    private final int numReadThreads;
    private final int maxReadMsg;
    private final Duration readPollingInterval;
    private final Duration readLagTime;
    private final Duration maxReadTime;
    private final boolean allowReadMarkerRecovery = true;
    private ScheduledExecutorService readExecutor;
    private MessagePuller[] msgPullers;
    private final AtomicLong numBucketCounter;
    private final AtomicLong numMsgCounter;
    private final List<MessageReader> readers;
    private volatile boolean isOpen;
    private final TimestampProvider times;

    public KCVSLog(String name, KCVSLogManager manager, KeyColumnValueStore store, Configuration config) {
        Preconditions.checkArgument((manager != null && name != null && store != null && config != null ? 1 : 0) != 0);
        this.name = name;
        this.manager = manager;
        this.store = store;
        this.times = config.get(GraphDatabaseConfiguration.TIMESTAMP_PROVIDER, new String[0]);
        this.keyConsistentOperations = config.get(LOG_KEY_CONSISTENT, new String[0]);
        this.numBuckets = config.get(GraphDatabaseConfiguration.LOG_NUM_BUCKETS, new String[0]);
        Preconditions.checkArgument((this.numBuckets >= 1 && this.numBuckets <= Integer.MAX_VALUE ? 1 : 0) != 0);
        this.sendBatchSize = config.get(GraphDatabaseConfiguration.LOG_SEND_BATCH_SIZE, new String[0]);
        this.maxSendDelay = config.get(GraphDatabaseConfiguration.LOG_SEND_DELAY, new String[0]);
        this.maxWriteTime = config.get(LOG_MAX_WRITE_TIME, new String[0]);
        this.numReadThreads = config.get(GraphDatabaseConfiguration.LOG_READ_THREADS, new String[0]);
        this.maxReadMsg = config.get(GraphDatabaseConfiguration.LOG_READ_BATCH_SIZE, new String[0]);
        this.readPollingInterval = config.get(GraphDatabaseConfiguration.LOG_READ_INTERVAL, new String[0]);
        this.readLagTime = config.get(LOG_READ_LAG_TIME, new String[0]).plus(this.maxSendDelay);
        this.maxReadTime = config.get(LOG_MAX_READ_TIME, new String[0]);
        if (MIN_DELIVERY_DELAY.compareTo(this.maxSendDelay) <= 0) {
            this.outgoingMsg = new ArrayBlockingQueue(this.sendBatchSize * 10);
            this.sendThread = new SendThread();
            this.sendThread.start();
        } else {
            this.outgoingMsg = null;
            this.sendThread = null;
        }
        this.readExecutor = null;
        this.msgPullers = null;
        this.numMsgCounter = new AtomicLong(this.readSetting(manager.senderId, MESSAGE_COUNTER_COLUMN, 0L));
        this.numBucketCounter = new AtomicLong(0L);
        this.readers = new ArrayList<MessageReader>();
        this.isOpen = true;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public synchronized void close() throws BackendException {
        if (!this.isOpen) {
            return;
        }
        this.isOpen = false;
        if (this.readExecutor != null) {
            this.readExecutor.shutdown();
        }
        if (this.sendThread != null) {
            this.sendThread.close(CLOSE_DOWN_WAIT);
        }
        if (this.readExecutor != null) {
            try {
                this.readExecutor.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.error("Could not terminate reader thread pool for KCVSLog " + this.name + " due to interruption");
            }
            if (!this.readExecutor.isTerminated()) {
                this.readExecutor.shutdownNow();
                log.error("Reader thread pool for KCVSLog " + this.name + " did not shut down in time - could not clean up or set read markers");
            } else {
                for (MessagePuller puller : this.msgPullers) {
                    puller.close();
                }
            }
        }
        this.writeSetting(this.manager.senderId, MESSAGE_COUNTER_COLUMN, this.numMsgCounter.get());
        this.store.close();
        this.manager.closedLog(this);
    }

    @Override
    public StoreTransaction openTx() throws BackendException {
        StandardBaseTransactionConfig config = this.keyConsistentOperations ? StandardBaseTransactionConfig.of(this.times, this.manager.storeManager.getFeatures().getKeyConsistentTxConfig()) : StandardBaseTransactionConfig.of(this.times);
        return this.manager.storeManager.beginTransaction(config);
    }

    private int getTimeSlice(Instant timestamp) {
        long value = this.times.getTime(timestamp) / 100000000L;
        if (value > Integer.MAX_VALUE || value < 0L) {
            throw new IllegalArgumentException("Timestamp overflow detected: " + timestamp);
        }
        return (int)value;
    }

    private StaticBuffer getLogKey(int partitionId, int bucketId, int timeslice) {
        Preconditions.checkArgument((partitionId >= 0 && partitionId < 1 << this.manager.partitionBitWidth ? 1 : 0) != 0);
        Preconditions.checkArgument((bucketId >= 0 && bucketId < this.numBuckets ? 1 : 0) != 0);
        DataOutput o = this.manager.serializer.getDataOutput(12);
        o.putInt(partitionId << 32 - this.manager.partitionBitWidth);
        o.putInt(bucketId);
        o.putInt(timeslice);
        return o.getStaticBuffer();
    }

    private Entry writeMessage(KCVSMessage msg) {
        StaticBuffer content = msg.getContent();
        DataOutput out = this.manager.serializer.getDataOutput(16 + this.manager.senderId.length() + 2 + content.length());
        Instant rawTimestamp = msg.getTimestamp();
        Preconditions.checkArgument((boolean)rawTimestamp.isAfter(Instant.EPOCH));
        out.putLong(this.times.getTime(rawTimestamp));
        out.writeObjectNotNull(this.manager.senderId);
        out.putLong(this.numMsgCounter.incrementAndGet());
        int valuePos = out.getPosition();
        out.putBytes(content);
        return new StaticArrayEntry(out.getStaticBuffer(), valuePos);
    }

    private KCVSMessage parseMessage(Entry msg) {
        ReadBuffer r = msg.asReadBuffer();
        Instant timestamp = this.times.getTime(r.getLong());
        String senderId = this.manager.serializer.readObjectNotNull(r, String.class);
        return new KCVSMessage(msg.getValue(), timestamp, senderId);
    }

    @Override
    public Future<Message> add(StaticBuffer content) {
        return this.add(content, this.manager.defaultWritePartitionIds[random.nextInt(this.manager.defaultWritePartitionIds.length)]);
    }

    @Override
    public Future<Message> add(StaticBuffer content, StaticBuffer key) {
        return this.add(content, key, null);
    }

    public Future<Message> add(StaticBuffer content, StaticBuffer key, ExternalPersistor persistor) {
        Preconditions.checkArgument((key != null && key.length() > 0 ? 1 : 0) != 0, (String)"Invalid key provided: %s", (Object[])new Object[]{key});
        int partitionId = 0;
        for (int i = 0; i < 4; ++i) {
            int b = key.length() > i ? key.getByte(i) & 0xFF : 0;
            partitionId = (partitionId << 8) + b;
        }
        assert (this.manager.partitionBitWidth >= 0 && this.manager.partitionBitWidth <= 32);
        partitionId = this.manager.partitionBitWidth == 0 ? 0 : (partitionId >>>= 32 - this.manager.partitionBitWidth);
        return this.add(content, partitionId, persistor);
    }

    private Future<Message> add(StaticBuffer content, int partitionId) {
        return this.add(content, partitionId, null);
    }

    private Future<Message> add(StaticBuffer content, int partitionId, ExternalPersistor persistor) {
        ResourceUnavailableException.verifyOpen(this.isOpen, "Log", this.name);
        Preconditions.checkArgument((content != null && content.length() > 0 ? 1 : 0) != 0, (Object)"Content is empty");
        Preconditions.checkArgument((partitionId >= 0 && partitionId < 1 << this.manager.partitionBitWidth ? 1 : 0) != 0, (String)"Invalid partition id: %s", (Object[])new Object[]{partitionId});
        Instant timestamp = this.times.getTime();
        KCVSMessage msg = new KCVSMessage(content, timestamp, this.manager.senderId);
        FutureMessage<KCVSMessage> fmsg = new FutureMessage<KCVSMessage>(msg);
        StaticBuffer key = this.getLogKey(partitionId, (int)(this.numBucketCounter.incrementAndGet() % (long)this.numBuckets), this.getTimeSlice(timestamp));
        MessageEnvelope envelope = new MessageEnvelope(fmsg, key, this.writeMessage(msg));
        if (persistor != null) {
            try {
                persistor.add(envelope.key, envelope.entry);
                envelope.message.delivered();
            }
            catch (JanusGraphException e) {
                envelope.message.failed(e);
                throw e;
            }
        }
        if (this.outgoingMsg == null) {
            this.sendMessages((List<MessageEnvelope>)ImmutableList.of((Object)envelope));
        } else {
            try {
                this.outgoingMsg.put(envelope);
                log.debug("Enqueued {} for partition {}", (Object)envelope, (Object)partitionId);
            }
            catch (InterruptedException e) {
                throw new JanusGraphException("Got interrupted waiting to send message", e);
            }
        }
        return fmsg;
    }

    private void sendMessages(final List<MessageEnvelope> msgEnvelopes) {
        try {
            boolean success = BackendOperation.execute(new BackendOperation.Transactional<Boolean>(){

                @Override
                public Boolean call(StoreTransaction txh) throws BackendException {
                    ArrayListMultimap mutations = ArrayListMultimap.create();
                    for (MessageEnvelope env : msgEnvelopes) {
                        mutations.put((Object)env.key, (Object)env.entry);
                        long ts = env.entry.getColumn().getLong(0);
                        log.debug("Preparing to write {} to storage with column/timestamp {}", (Object)env, (Object)KCVSLog.this.times.getTime(ts));
                    }
                    HashMap<StaticBuffer, KCVMutation> muts = new HashMap<StaticBuffer, KCVMutation>(mutations.keySet().size());
                    for (StaticBuffer key : mutations.keySet()) {
                        muts.put(key, new KCVMutation(mutations.get((Object)key), KeyColumnValueStore.NO_DELETIONS));
                        log.debug("Built mutation on key {} with {} additions", (Object)key, (Object)mutations.get((Object)key).size());
                    }
                    ((KCVSLog)KCVSLog.this).manager.storeManager.mutateMany((Map<String, Map<StaticBuffer, KCVMutation>>)ImmutableMap.of((Object)KCVSLog.this.store.getName(), muts), txh);
                    log.debug("Wrote {} total envelopes with operation timestamp {}", (Object)msgEnvelopes.size(), (Object)txh.getConfiguration().getCommitTime());
                    return Boolean.TRUE;
                }

                public String toString() {
                    return "messageSending";
                }
            }, this, this.times, this.maxWriteTime);
            Preconditions.checkState((boolean)success);
            log.debug("Wrote {} messages to backend", (Object)msgEnvelopes.size());
            for (MessageEnvelope msgEnvelope : msgEnvelopes) {
                msgEnvelope.message.delivered();
            }
        }
        catch (JanusGraphException e) {
            for (MessageEnvelope msgEnvelope : msgEnvelopes) {
                msgEnvelope.message.failed(e);
            }
            throw e;
        }
    }

    @Override
    public synchronized void registerReader(ReadMarker readMarker, MessageReader ... reader) {
        Preconditions.checkArgument((reader != null && reader.length > 0 ? 1 : 0) != 0, (Object)"Must specify at least one reader");
        this.registerReaders(readMarker, Arrays.asList(reader));
    }

    @Override
    public synchronized void registerReaders(ReadMarker readMarker, Iterable<MessageReader> readers) {
        ResourceUnavailableException.verifyOpen(this.isOpen, "Log", this.name);
        Preconditions.checkArgument((!Iterables.isEmpty(readers) ? 1 : 0) != 0, (Object)"Must specify at least one reader");
        Preconditions.checkArgument((readMarker != null ? 1 : 0) != 0, (Object)"Read marker cannot be null");
        Preconditions.checkArgument((this.readMarker == null || this.readMarker.isCompatible(readMarker) ? 1 : 0) != 0, (Object)"Provided read marker is not compatible with existing read marker for previously registered readers");
        if (this.readMarker == null) {
            this.readMarker = readMarker;
        }
        boolean firstRegistration = this.readers.isEmpty();
        for (MessageReader reader : readers) {
            Preconditions.checkNotNull((Object)reader);
            if (this.readers.contains(reader)) continue;
            this.readers.add(reader);
        }
        if (firstRegistration && !this.readers.isEmpty()) {
            this.readExecutor = new ScheduledThreadPoolExecutor(this.numReadThreads, new RejectedExecutionHandler(){

                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    r.run();
                }
            });
            this.msgPullers = new MessagePuller[this.manager.readPartitionIds.length * this.numBuckets];
            int pos = 0;
            for (int partitionId : this.manager.readPartitionIds) {
                for (int bucketId = 0; bucketId < this.numBuckets; ++bucketId) {
                    this.msgPullers[pos] = new MessagePuller(partitionId, bucketId);
                    log.debug("Creating log read executor: initialDelay={} delay={} unit={}", new Object[]{INITIAL_READER_DELAY.toNanos(), this.readPollingInterval.toNanos(), TimeUnit.NANOSECONDS});
                    this.readExecutor.scheduleWithFixedDelay(this.msgPullers[pos], INITIAL_READER_DELAY.toNanos(), this.readPollingInterval.toNanos(), TimeUnit.NANOSECONDS);
                    ++pos;
                }
            }
            this.readExecutor.scheduleWithFixedDelay(new MessageReaderStateUpdater(), INITIAL_READER_DELAY.toNanos(), this.readPollingInterval.toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    @Override
    public synchronized boolean unregisterReader(MessageReader reader) {
        ResourceUnavailableException.verifyOpen(this.isOpen, "Log", this.name);
        return this.readers.remove(reader);
    }

    private StaticBuffer getMarkerColumn(int partitionId, int bucketId) {
        DataOutput out = this.manager.serializer.getDataOutput(9);
        out.putByte((byte)2);
        out.putInt(partitionId);
        out.putInt(bucketId);
        return out.getStaticBuffer();
    }

    private StaticBuffer getSettingKey(String identifier) {
        DataOutput out = this.manager.serializer.getDataOutput(6 + identifier.length());
        out.putInt(-1);
        out.writeObjectNotNull(identifier);
        return out.getStaticBuffer();
    }

    private long readSetting(String identifier, final StaticBuffer column, long defaultValue) {
        final StaticBuffer key = this.getSettingKey(identifier);
        StaticBuffer value = BackendOperation.execute(new BackendOperation.Transactional<StaticBuffer>(){

            @Override
            public StaticBuffer call(StoreTransaction txh) throws BackendException {
                return KCVSUtil.get(KCVSLog.this.store, key, column, txh);
            }

            public String toString() {
                return "readingLogSetting";
            }
        }, this, this.times, this.maxReadTime);
        if (value == null) {
            return defaultValue;
        }
        Preconditions.checkArgument((value.length() == 8 ? 1 : 0) != 0);
        return value.getLong(0);
    }

    private void writeSetting(String identifier, StaticBuffer column, long value) {
        final StaticBuffer key = this.getSettingKey(identifier);
        final Entry add = StaticArrayEntry.of(column, BufferUtil.getLongBuffer(value));
        Boolean status = BackendOperation.execute(new BackendOperation.Transactional<Boolean>(){

            @Override
            public Boolean call(StoreTransaction txh) throws BackendException {
                KCVSLog.this.store.mutate(key, (List<Entry>)ImmutableList.of((Object)add), KeyColumnValueStore.NO_DELETIONS, txh);
                return Boolean.TRUE;
            }

            public String toString() {
                return "writingLogSetting";
            }
        }, this, this.times, this.maxWriteTime);
        Preconditions.checkState((boolean)status);
    }

    private class MessagePuller
    implements Runnable {
        private final int bucketId;
        private final int partitionId;
        private Instant messageTimeStart;

        private MessagePuller(int partitionId, int bucketId) {
            this.bucketId = bucketId;
            this.partitionId = partitionId;
            this.initializeTimepoint();
        }

        @Override
        public void run() {
            try {
                Instant messageTimeEnd;
                this.setReadMarker();
                int timeslice = KCVSLog.this.getTimeSlice(this.messageTimeStart);
                Instant currentTime = KCVSLog.this.times.getTime();
                Instant maxSafeMessageTime = currentTime.minus(KCVSLog.this.readLagTime);
                Instant timesliceEnd = KCVSLog.this.times.getTime((long)(timeslice + 1) * 100000000L);
                Instant instant = messageTimeEnd = 0 > maxSafeMessageTime.compareTo(timesliceEnd) ? maxSafeMessageTime : timesliceEnd;
                if (0 <= this.messageTimeStart.compareTo(messageTimeEnd)) {
                    Duration delta = Duration.between(messageTimeEnd, this.messageTimeStart);
                    if (delta.toNanos() / 3L > KCVSLog.this.readLagTime.toNanos()) {
                        log.warn("MessagePuller configured with ReadMarker timestamp in the improbably distant future: {} (current time is {})", (Object)this.messageTimeStart, (Object)currentTime);
                    } else {
                        log.debug("MessagePuller configured with ReadMarker timestamp slightly ahead of read lag time; waiting for the clock to catch up");
                    }
                    return;
                }
                log.trace("MessagePuller time window: [{}, {})", (Object)this.messageTimeStart, (Object)messageTimeEnd);
                Preconditions.checkState((this.messageTimeStart.compareTo(messageTimeEnd) < 0 ? 1 : 0) != 0);
                Preconditions.checkState((messageTimeEnd.compareTo(currentTime) <= 0 ? 1 : 0) != 0, (String)"Attempting to read messages from the future: messageTimeEnd=% vs currentTime=%s", (Object[])new Object[]{messageTimeEnd, currentTime});
                StaticBuffer logKey = KCVSLog.this.getLogKey(this.partitionId, this.bucketId, timeslice);
                KeySliceQuery query = new KeySliceQuery(logKey, BufferUtil.getLongBuffer(KCVSLog.this.times.getTime(this.messageTimeStart)), BufferUtil.getLongBuffer(KCVSLog.this.times.getTime(messageTimeEnd)));
                query.setLimit(KCVSLog.this.maxReadMsg);
                log.trace("Converted MessagePuller time window to {}", (Object)query);
                List<Entry> entries = BackendOperation.execute(this.getOperation(query), KCVSLog.this, KCVSLog.this.times, KCVSLog.this.maxReadTime);
                this.prepareMessageProcessing(entries);
                if (entries.size() >= KCVSLog.this.maxReadMsg) {
                    Entry lastEntry = entries.get(entries.size() - 1);
                    messageTimeEnd = messageTimeEnd.plus(TWO_MICROSECONDS);
                    log.debug("Extended time window to {}", (Object)messageTimeEnd);
                    query = new KeySliceQuery(logKey, BufferUtil.nextBiggerBuffer(lastEntry.getColumn()), BufferUtil.getLongBuffer(KCVSLog.this.times.getTime(messageTimeEnd)));
                    log.debug("Converted extended MessagePuller time window to {}", (Object)query);
                    List<Entry> extraEntries = BackendOperation.execute(this.getOperation(query), KCVSLog.this, KCVSLog.this.times, KCVSLog.this.maxReadTime);
                    this.prepareMessageProcessing(extraEntries);
                }
                this.messageTimeStart = messageTimeEnd;
            }
            catch (Throwable e) {
                log.warn("Could not read messages for timestamp [" + this.messageTimeStart + "] (this read will be retried)", e);
            }
        }

        private void initializeTimepoint() {
            Preconditions.checkState((null == this.messageTimeStart ? 1 : 0) != 0);
            if (!KCVSLog.this.readMarker.hasIdentifier()) {
                this.messageTimeStart = KCVSLog.this.readMarker.getStartTime(KCVSLog.this.times);
                log.info("Loaded unidentified ReadMarker start time {} into {}", (Object)this.messageTimeStart, (Object)this);
            } else {
                long savedTimestamp = KCVSLog.this.readSetting(KCVSLog.this.readMarker.getIdentifier(), KCVSLog.this.getMarkerColumn(this.partitionId, this.bucketId), KCVSLog.this.times.getTime(KCVSLog.this.readMarker.getStartTime(KCVSLog.this.times)));
                this.messageTimeStart = KCVSLog.this.times.getTime(savedTimestamp);
                log.info("Loaded indentified ReadMarker start time {} into {}", (Object)this.messageTimeStart, (Object)this);
            }
        }

        private void prepareMessageProcessing(List<Entry> entries) {
            for (Entry entry : entries) {
                KCVSMessage message = KCVSLog.this.parseMessage(entry);
                log.debug("Parsed message {}, about to submit this message to the reader executor", (Object)message);
                for (MessageReader reader : KCVSLog.this.readers) {
                    KCVSLog.this.readExecutor.submit(new ProcessMessageJob(message, reader));
                }
            }
        }

        private void setReadMarker() {
            if (KCVSLog.this.readMarker.hasIdentifier()) {
                try {
                    log.debug("Attempting to persist read marker with identifier {}", (Object)KCVSLog.this.readMarker.getIdentifier());
                    KCVSLog.this.writeSetting(KCVSLog.this.readMarker.getIdentifier(), KCVSLog.this.getMarkerColumn(this.partitionId, this.bucketId), KCVSLog.this.times.getTime(this.messageTimeStart));
                    log.debug("Persisted read marker: identifier={} partitionId={} buckedId={} nextTimepoint={}", new Object[]{KCVSLog.this.readMarker.getIdentifier(), this.partitionId, this.bucketId, this.messageTimeStart});
                }
                catch (Throwable e) {
                    log.error("Could not persist read marker [" + KCVSLog.this.readMarker.getIdentifier() + "] on bucket [" + this.bucketId + "] + partition [" + this.partitionId + "]", e);
                }
            }
        }

        private void close() {
            this.setReadMarker();
        }

        private BackendOperation.Transactional<List<Entry>> getOperation(final KeySliceQuery query) {
            return new BackendOperation.Transactional<List<Entry>>(){

                @Override
                public List<Entry> call(StoreTransaction txh) throws BackendException {
                    return KCVSLog.this.store.getSlice(query, txh);
                }

                public String toString() {
                    return "messageReading@" + MessagePuller.this.partitionId + ":" + MessagePuller.this.bucketId;
                }
            };
        }
    }

    private class MessageReaderStateUpdater
    implements Runnable {
        private MessageReaderStateUpdater() {
        }

        @Override
        public void run() {
            for (MessageReader reader : KCVSLog.this.readers) {
                reader.updateState();
            }
        }
    }

    private class SendThread
    extends BackgroundThread {
        private List<MessageEnvelope> toSend;

        public SendThread() {
            super("KCVSLogSend" + KCVSLog.this.name, false);
            this.toSend = new ArrayList<MessageEnvelope>(KCVSLog.this.sendBatchSize * 3 / 2);
        }

        private Duration timeSinceFirstMsg() {
            Instant nowTimestamp;
            Instant firstTimestamp;
            Duration sinceFirst = Duration.ZERO;
            if (!this.toSend.isEmpty() && (firstTimestamp = this.toSend.get((int)0).message.getMessage().getTimestamp()).compareTo(nowTimestamp = KCVSLog.this.times.getTime()) < 0) {
                sinceFirst = Duration.between(firstTimestamp, nowTimestamp);
            }
            return sinceFirst;
        }

        private Duration maxWaitTime() {
            if (!this.toSend.isEmpty()) {
                return KCVSLog.this.maxSendDelay.minus(this.timeSinceFirstMsg());
            }
            return FOREVER;
        }

        @Override
        protected void waitCondition() throws InterruptedException {
            MessageEnvelope msg = (MessageEnvelope)KCVSLog.this.outgoingMsg.poll(this.maxWaitTime().toNanos(), TimeUnit.NANOSECONDS);
            if (msg != null) {
                this.toSend.add(msg);
            }
        }

        @Override
        protected void action() {
            MessageEnvelope msg;
            while (this.toSend.size() < KCVSLog.this.sendBatchSize && (msg = (MessageEnvelope)KCVSLog.this.outgoingMsg.poll()) != null) {
                this.toSend.add(msg);
            }
            if (!(this.toSend.isEmpty() || KCVSLog.this.maxSendDelay.compareTo(this.timeSinceFirstMsg()) > 0 && this.toSend.size() < KCVSLog.this.sendBatchSize)) {
                try {
                    KCVSLog.this.sendMessages(this.toSend);
                }
                finally {
                    this.toSend.clear();
                }
            }
        }

        @Override
        protected void cleanup() {
            if (!this.toSend.isEmpty() || !KCVSLog.this.outgoingMsg.isEmpty()) {
                this.toSend.addAll(KCVSLog.this.outgoingMsg);
                for (int i = 0; i < this.toSend.size(); i += KCVSLog.this.sendBatchSize) {
                    List<MessageEnvelope> subset = this.toSend.subList(i, Math.min(this.toSend.size(), i + KCVSLog.this.sendBatchSize));
                    try {
                        KCVSLog.this.sendMessages(subset);
                        continue;
                    }
                    catch (RuntimeException e) {
                        for (int j = i + KCVSLog.this.sendBatchSize; j < this.toSend.size(); ++j) {
                            this.toSend.get((int)j).message.failed(e);
                        }
                        throw e;
                    }
                }
            }
        }
    }

    private static class MessageEnvelope {
        final FutureMessage<KCVSMessage> message;
        final StaticBuffer key;
        final Entry entry;

        private MessageEnvelope(FutureMessage<KCVSMessage> message, StaticBuffer key, Entry entry) {
            this.message = message;
            this.key = key;
            this.entry = entry;
        }

        public String toString() {
            return "MessageEnvelope[message=" + this.message + ",key=" + this.key + ",entry=" + this.entry + "]";
        }
    }
}

