/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.verify;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.errors.AlreadyShutdownException;
import io.confluent.controlcenter.errors.DuplicateSequenceNumberException;
import io.confluent.controlcenter.errors.SequenceAfterShutdownException;
import io.confluent.controlcenter.errors.ShutdownSequenceNotHighestException;
import io.confluent.controlcenter.streams.internals.KeyValueStoreFacade;
import io.confluent.controlcenter.streams.verify.MonitoringHeartbeatSender;
import io.confluent.controlcenter.streams.verify.MonitoringVerifier;
import io.confluent.controlcenter.util.KvQuery;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.common.TimeBucket;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyUberSerde;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VerifyTransformerSupplier
implements TransformerSupplier<Void, Monitoring.MonitoringMessage, KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>>> {
    private static final Logger log = LoggerFactory.getLogger(VerifyTransformerSupplier.class);
    private static final long DEFAULT_RETAIN_MS = TimeUnit.HOURS.toMillis(4L);
    private static final long PUNCTUATE_INTERVAL_MS = 12L * TimeBucket.SIZE;
    private static final long DEFAULT_MAX_PUNCTUATE_MS = TimeUnit.SECONDS.toMillis(1L);
    private static final long DEFAULT_MAX_PUNCTUATE_ITEMS = 94210L;
    private final long sequenceTimeout;
    private final String storeName;
    private final long storeRetainMs;
    private final long maxPunctuateMs;
    private final long maxPunctuateItems;
    private final long punctuateIntervalMs;
    private final Clock clock;
    private final OrderedKeyUberSerde<Monitoring.MonitoringMessage> keySerde;
    private final String sourceTopic;
    private final MonitoringHeartbeatSender heartbeatSender;

    public VerifyTransformerSupplier(long sequenceTimeout, String storeName, long storeRetainMs, long maxPunctuateMs, long maxPunctuateItems, long punctuateIntervalMs, String sourceTopic, MonitoringHeartbeatSender heartbeatSender, Clock clock, OrderedKeyUberSerde<Monitoring.MonitoringMessage> keySerde) {
        this.sequenceTimeout = sequenceTimeout;
        this.storeName = storeName;
        this.storeRetainMs = storeRetainMs;
        this.maxPunctuateMs = maxPunctuateMs;
        this.maxPunctuateItems = maxPunctuateItems;
        this.punctuateIntervalMs = punctuateIntervalMs;
        this.heartbeatSender = heartbeatSender;
        this.sourceTopic = sourceTopic;
        this.clock = clock;
        this.keySerde = keySerde;
    }

    public VerifyTransformerSupplier(String storeName, String sourceTopic, MonitoringHeartbeatSender heartbeatSender, Clock clock, OrderedKeyUberSerde<Monitoring.MonitoringMessage> keySerde) {
        this(TimeBucket.SEQUENCE_TIMEOUT, storeName, DEFAULT_RETAIN_MS, DEFAULT_MAX_PUNCTUATE_MS, 94210L, PUNCTUATE_INTERVAL_MS, sourceTopic, heartbeatSender, clock, keySerde);
    }

    public Transformer<Void, Monitoring.MonitoringMessage, KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>>> get() {
        return new MonitoringVerifyTransformer(this.punctuateIntervalMs, this.maxPunctuateMs, this.maxPunctuateItems, this.storeName, this.storeRetainMs, this.heartbeatSender.numberOfTopicPartitions(this.sourceTopic));
    }

    static Collection<Monitoring.MonitoringMessage> errorMessagesInRange(RangeSet<Long> windowRanges, Monitoring.MonitoringMessage baseMessage, long now) {
        if (windowRanges == null || windowRanges.isEmpty()) {
            return Collections.emptySet();
        }
        ImmutableList.Builder out = ImmutableList.builder();
        TreeRangeSet writtenRanges = TreeRangeSet.create();
        for (Rollup rollup : Rollup.values()) {
            TreeRangeSet rescaledRanges = TreeRangeSet.create();
            for (Range r : writtenRanges.asRanges()) {
                long lower = rollup.toWindow((Long)r.lowerEndpoint());
                long upper = rollup.toWindow((Long)r.upperEndpoint()) + rollup.getMillis();
                rescaledRanges.add(Range.closedOpen((Comparable)Long.valueOf(lower), (Comparable)Long.valueOf(upper)));
            }
            writtenRanges = rescaledRanges;
            Range rollupRange = Range.closedOpen((Comparable)Long.valueOf(rollup.toWindow(now - rollup.getRetainMillis())), (Comparable)Long.valueOf(rollup.toWindow(now + rollup.getRetainMillis())));
            for (Range range : windowRanges.subRangeSet(rollupRange).asRanges()) {
                for (long win = rollup.toWindow((Long)range.lowerEndpoint()); win < (Long)range.upperEndpoint(); win += rollup.getMillis()) {
                    if (writtenRanges.contains((Comparable)Long.valueOf(win))) continue;
                    Monitoring.MonitoringMessage.Builder msg = Monitoring.MonitoringMessage.newBuilder((Monitoring.MonitoringMessage)baseMessage);
                    msg.setType(Monitoring.MessageType.ERROR);
                    msg.setWindow(win);
                    msg.setArrivalTime(now);
                    out.add((Object)msg.build());
                    writtenRanges.add(Range.closedOpen((Comparable)Long.valueOf(win), (Comparable)Long.valueOf(win + rollup.getMillis())));
                }
            }
        }
        return out.build();
    }

    private static String logKey(Monitoring.MonitoringMessage message) {
        return Joiner.on((char)' ').join((Object)"clientId=", (Object)message.getClientId(), new Object[]{"topic=", message.getTopic(), "partition=", message.getPartition(), "session=", message.getSession()});
    }

    class MonitoringVerifyTransformer
    implements Transformer<Void, Monitoring.MonitoringMessage, KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>>> {
        private final long storeRetainMs;
        private final long maxPunctuateMs;
        private final long maxPunctuateItems;
        private final long punctuateIntervalMs;
        private final String storeName;
        private long lastPunctuateTimeMs = -1L;
        private Bytes lastPunctuateKey = null;
        private ProcessorContext context;
        private KeyValueStore<Bytes, MonitoringVerifier> kvStore;
        private int slowestMonitoringPartition;
        private ArrayList<Long> partitionTimestamps;

        public MonitoringVerifyTransformer(long punctuateIntervalMs, long maxPunctuateMs, long maxPunctuateItems, String storeName, long storeRetainMs, int numSourcePartitions) {
            this.storeName = storeName;
            this.storeRetainMs = storeRetainMs;
            this.maxPunctuateMs = maxPunctuateMs;
            this.maxPunctuateItems = maxPunctuateItems;
            this.punctuateIntervalMs = punctuateIntervalMs;
            this.partitionTimestamps = new ArrayList(numSourcePartitions);
            for (int i = 0; i < numSourcePartitions; ++i) {
                this.partitionTimestamps.add(i, -1L);
            }
            this.slowestMonitoringPartition = 0;
        }

        public void init(ProcessorContext context) {
            this.context = context;
            StateStore rawkvStore = this.context.getStateStore(this.storeName);
            this.kvStore = rawkvStore instanceof TimestampedKeyValueStore ? new KeyValueStoreFacade<Bytes, MonitoringVerifier>((TimestampedKeyValueStore)rawkvStore) : (KeyValueStore)rawkvStore;
        }

        public KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>> transform(Void key, Monitoring.MonitoringMessage monitoringMessage) {
            if (monitoringMessage == null) {
                return null;
            }
            log.trace("transform store={} message={}", (Object)this.storeName, (Object)monitoringMessage);
            long arrivalTime = this.partitionTimestamps.get(monitoringMessage.getMonitoringTopicPartition());
            ArrayList<KeyValue<Void, Monitoring.MonitoringMessage>> ret = new ArrayList<KeyValue<Void, Monitoring.MonitoringMessage>>();
            if (monitoringMessage.getClientType() == Monitoring.ClientType.CONTROLCENTER) {
                this.maybePunctuate(monitoringMessage, ret);
                if (ret.size() == 0) {
                    return null;
                }
                return KeyValue.pair(null, ret);
            }
            String logKey = VerifyTransformerSupplier.logKey(monitoringMessage);
            Bytes storeKey = VerifyTransformerSupplier.this.keySerde.key((Object)monitoringMessage);
            try {
                MonitoringVerifier verifier = (MonitoringVerifier)this.kvStore.get((Object)storeKey);
                long clockTime = this.partitionTimestamps.get(this.slowestMonitoringPartition);
                if (verifier == null) {
                    verifier = new MonitoringVerifier(monitoringMessage, VerifyTransformerSupplier.this.sequenceTimeout);
                }
                log.trace("store={} {} monitoringVerifier={}", new Object[]{this.storeName, logKey, verifier});
                if (monitoringMessage.getShutdown()) {
                    log.info("closing session={} sequence={} store={}", new Object[]{monitoringMessage.getSession(), monitoringMessage.getSequence(), this.storeName});
                }
                verifier.addSequence(monitoringMessage, arrivalTime);
                if (clockTime > 0L) {
                    this.verifyMonitoringData(verifier, clockTime, ret);
                }
                this.kvStore.put((Object)storeKey, (Object)verifier);
            }
            catch (DuplicateSequenceNumberException dse) {
                log.info("ignoring duplicate sequence={} {} store={}", new Object[]{monitoringMessage.getSequence(), logKey, this.storeName});
                return null;
            }
            catch (IllegalStateException ise) {
                log.info("unexpected arrival of sequence={} {} store={} ise={}", new Object[]{monitoringMessage.getSequence(), logKey, this.storeName, ise.getMessage()});
                return null;
            }
            catch (AlreadyShutdownException iae) {
                log.error("shutdown already received sequence={} {} store={}", new Object[]{monitoringMessage.getSequence(), logKey, this.storeName, iae.getMessage()});
                return null;
            }
            catch (SequenceAfterShutdownException se) {
                log.error("sequence number higher than shutdown message sequence={} {} store={}", new Object[]{monitoringMessage.getSequence(), logKey, this.storeName, se.getMessage()});
                return null;
            }
            catch (ShutdownSequenceNotHighestException sse) {
                log.error("sequence number of shutdown message not highest sequence={} {} store={}", new Object[]{monitoringMessage.getSequence(), logKey, this.storeName, sse.getMessage()});
                return null;
            }
            catch (Exception e) {
                log.warn("Failed to update verifier with this message's sequence number. This sequence number maybe declared missing later even thought it is not. Sequence={}.", (Object)monitoringMessage.getSequence(), (Object)e);
            }
            if (monitoringMessage.getType() == Monitoring.MessageType.NORMAL) {
                ret.add((KeyValue<Void, Monitoring.MonitoringMessage>)KeyValue.pair((Object)null, (Object)Monitoring.MonitoringMessage.newBuilder((Monitoring.MonitoringMessage)monitoringMessage).setArrivalTime(arrivalTime >= 0L ? arrivalTime : 0L).build()));
            }
            if (ret.size() == 0) {
                return null;
            }
            return KeyValue.pair(null, ret);
        }

        KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>> punctuate(long timestamp) {
            if (this.lastPunctuateTimeMs < 0L) {
                this.lastPunctuateTimeMs = timestamp;
            }
            if (timestamp - this.lastPunctuateTimeMs < this.punctuateIntervalMs) {
                return null;
            }
            long now = VerifyTransformerSupplier.this.clock.currentTimeMillis();
            this.lastPunctuateTimeMs = timestamp;
            long punctuateTimeout = now + this.maxPunctuateMs;
            ArrayList<KeyValue<Void, Monitoring.MonitoringMessage>> missingWindowMsgList = new ArrayList<KeyValue<Void, Monitoring.MonitoringMessage>>();
            try (KeyValueIterator<Bytes, MonitoringVerifier> iter = KvQuery.rangeFrom(this.kvStore, this.lastPunctuateKey);){
                long count = 0L;
                while (iter.hasNext()) {
                    KeyValue entry = (KeyValue)iter.next();
                    if (entry == null) continue;
                    if (entry.value == null) {
                        log.debug("removing null value key={} from store={}", entry.key, (Object)this.storeName);
                        this.kvStore.delete(entry.key);
                    } else {
                        log.trace("checking key={} verifier={} store={}", new Object[]{entry.key, entry.value, this.storeName});
                        this.verifyMonitoringData((MonitoringVerifier)entry.value, timestamp, missingWindowMsgList);
                        if (((MonitoringVerifier)entry.value).isSessionNotActive() && timestamp - ((MonitoringVerifier)entry.value).lastActivityTimeMs() > this.storeRetainMs) {
                            log.debug("removing verifier key={} lastActivity={} from store={}", new Object[]{entry.key, ((MonitoringVerifier)entry.value).lastActivityTimeMs(), this.storeName});
                            this.kvStore.delete(entry.key);
                        } else {
                            this.kvStore.put(entry.key, entry.value);
                        }
                    }
                    this.lastPunctuateKey = (Bytes)entry.key;
                    ++count;
                    if ((long)missingWindowMsgList.size() > this.maxPunctuateItems || VerifyTransformerSupplier.this.clock.currentTimeMillis() > punctuateTimeout) {
                        log.debug("stopping punctuate at key={} store={} count={}", new Object[]{this.lastPunctuateKey, this.storeName, count});
                        break;
                    }
                    if (!iter.hasNext()) {
                        log.debug("finished all punctuation for store={}", (Object)this.storeName);
                        this.lastPunctuateKey = null;
                    }
                    log.debug("punctuated on count={} records", (Object)count);
                }
            }
            catch (Exception e) {
                log.error("failure during punctuate for store={} e={}", (Object)this.storeName, (Object)e.getMessage());
            }
            if (missingWindowMsgList.isEmpty()) {
                return null;
            }
            return KeyValue.pair(null, missingWindowMsgList);
        }

        public void close() {
            log.info("closing store={}", (Object)this.storeName);
        }

        private boolean updateMonitoringTopicClock(int monitoringTopicPartition, long monitoringMessageTimestamp) {
            Long mtpTimestamp = this.partitionTimestamps.get(monitoringTopicPartition);
            if (mtpTimestamp >= monitoringMessageTimestamp) {
                return false;
            }
            long previousMin = this.partitionTimestamps.get(this.slowestMonitoringPartition);
            long maxNewTimestamp = mtpTimestamp == -1L ? monitoringMessageTimestamp : mtpTimestamp + VerifyTransformerSupplier.this.sequenceTimeout - 1L;
            this.partitionTimestamps.set(monitoringTopicPartition, Math.min(maxNewTimestamp, monitoringMessageTimestamp));
            if (monitoringTopicPartition == this.slowestMonitoringPartition) {
                long min = Collections.min(this.partitionTimestamps);
                if (min != this.partitionTimestamps.get(this.slowestMonitoringPartition)) {
                    this.slowestMonitoringPartition = this.partitionTimestamps.indexOf(min);
                }
                if (previousMin != min) {
                    log.trace("Moved slowest clock: partition={} timestamp={}", (Object)this.slowestMonitoringPartition, (Object)min);
                    return true;
                }
            }
            return false;
        }

        private void maybePunctuate(Monitoring.MonitoringMessage monitoringMessage, List<KeyValue<Void, Monitoring.MonitoringMessage>> msgList) {
            KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>> pair;
            if (this.updateMonitoringTopicClock(monitoringMessage.getMonitoringTopicPartition(), monitoringMessage.getTimestamp()) && (pair = this.punctuate(this.partitionTimestamps.get(this.slowestMonitoringPartition))) != null) {
                Iterables.addAll(msgList, (Iterable)((Iterable)pair.value));
            }
        }

        private void verifyMonitoringData(MonitoringVerifier verifier, long now, List<KeyValue<Void, Monitoring.MonitoringMessage>> msgList) {
            RangeSet<Long> windowRanges = verifier.getMissingMonitoringData(now);
            if (windowRanges != null && !windowRanges.isEmpty()) {
                log.warn("missing monitoring data for client={} topic={} partition={} session={} window ranges={} {}", new Object[]{verifier.getClientId(), verifier.getTopic(), verifier.getPartition(), verifier.getSession(), windowRanges, verifier});
                Iterables.addAll(msgList, (Iterable)Iterables.transform(VerifyTransformerSupplier.errorMessagesInRange(windowRanges, verifier.getCopyOfBaseMonitoringMessage(), now), (Function)new Function<Monitoring.MonitoringMessage, KeyValue<Void, Monitoring.MonitoringMessage>>(){

                    public KeyValue<Void, Monitoring.MonitoringMessage> apply(Monitoring.MonitoringMessage input) {
                        return KeyValue.pair(null, (Object)input);
                    }
                }));
            }
        }
    }
}

