package io.confluent.controlcenter.streams.verify;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.errors.AlreadyShutdownException;
import io.confluent.controlcenter.errors.DuplicateSequenceNumberException;
import io.confluent.controlcenter.errors.SequenceAfterShutdownException;
import io.confluent.controlcenter.errors.ShutdownSequenceNotHighestException;
import io.confluent.controlcenter.streams.internals.KeyValueStoreFacade;
import io.confluent.controlcenter.util.KvQuery;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.common.TimeBucket;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyUberSerde;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/verify/VerifyTransformerSupplier.class */
public class VerifyTransformerSupplier implements TransformerSupplier<Void, Monitoring.MonitoringMessage, KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>>> {
    private static final Logger log = LoggerFactory.getLogger(VerifyTransformerSupplier.class);
    private static final long DEFAULT_RETAIN_MS = TimeUnit.HOURS.toMillis(4);
    private static final long PUNCTUATE_INTERVAL_MS = 12 * TimeBucket.SIZE;
    private static final long DEFAULT_MAX_PUNCTUATE_MS = TimeUnit.SECONDS.toMillis(1);
    private static final long DEFAULT_MAX_PUNCTUATE_ITEMS = 94210;
    private final long sequenceTimeout;
    private final String storeName;
    private final long storeRetainMs;
    private final long maxPunctuateMs;
    private final long maxPunctuateItems;
    private final long punctuateIntervalMs;
    private final Clock clock;
    private final OrderedKeyUberSerde<Monitoring.MonitoringMessage> keySerde;
    private final String sourceTopic;
    private final MonitoringHeartbeatSender heartbeatSender;

    /* loaded from: input_file:io/confluent/controlcenter/streams/verify/VerifyTransformerSupplier$MonitoringVerifyTransformer.class */
    class MonitoringVerifyTransformer implements Transformer<Void, Monitoring.MonitoringMessage, KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>>> {
        private final long storeRetainMs;
        private final long maxPunctuateMs;
        private final long maxPunctuateItems;
        private final long punctuateIntervalMs;
        private final String storeName;
        private long lastPunctuateTimeMs = -1;
        private Bytes lastPunctuateKey = null;
        private ProcessorContext context;
        private KeyValueStore<Bytes, MonitoringVerifier> kvStore;
        private int slowestMonitoringPartition;
        private ArrayList<Long> partitionTimestamps;

        public MonitoringVerifyTransformer(long j, long j2, long j3, String str, long j4, int i) {
            this.storeName = str;
            this.storeRetainMs = j4;
            this.maxPunctuateMs = j2;
            this.maxPunctuateItems = j3;
            this.punctuateIntervalMs = j;
            this.partitionTimestamps = new ArrayList<>(i);
            for (int i2 = 0; i2 < i; i2++) {
                this.partitionTimestamps.add(i2, -1L);
            }
            this.slowestMonitoringPartition = 0;
        }

        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
            TimestampedKeyValueStore stateStore = this.context.getStateStore(this.storeName);
            if (stateStore instanceof TimestampedKeyValueStore) {
                this.kvStore = new KeyValueStoreFacade(stateStore);
            } else {
                this.kvStore = (KeyValueStore) stateStore;
            }
        }

        public KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>> transform(Void r9, Monitoring.MonitoringMessage monitoringMessage) {
            if (monitoringMessage == null) {
                return null;
            }
            VerifyTransformerSupplier.log.trace("transform store={} message={}", this.storeName, monitoringMessage);
            long longValue = this.partitionTimestamps.get(monitoringMessage.getMonitoringTopicPartition()).longValue();
            ArrayList arrayList = new ArrayList();
            if (monitoringMessage.getClientType() == Monitoring.ClientType.CONTROLCENTER) {
                maybePunctuate(monitoringMessage, arrayList);
                if (arrayList.size() == 0) {
                    return null;
                }
                return KeyValue.pair((Object) null, arrayList);
            }
            String logKey = VerifyTransformerSupplier.logKey(monitoringMessage);
            Bytes key = VerifyTransformerSupplier.this.keySerde.key(monitoringMessage);
            try {
                MonitoringVerifier monitoringVerifier = (MonitoringVerifier) this.kvStore.get(key);
                long longValue2 = this.partitionTimestamps.get(this.slowestMonitoringPartition).longValue();
                if (monitoringVerifier == null) {
                    monitoringVerifier = new MonitoringVerifier(monitoringMessage, VerifyTransformerSupplier.this.sequenceTimeout);
                }
                VerifyTransformerSupplier.log.trace("store={} {} monitoringVerifier={}", new Object[]{this.storeName, logKey, monitoringVerifier});
                if (monitoringMessage.getShutdown()) {
                    VerifyTransformerSupplier.log.info("closing session={} sequence={} store={}", new Object[]{monitoringMessage.getSession(), Long.valueOf(monitoringMessage.getSequence()), this.storeName});
                }
                monitoringVerifier.addSequence(monitoringMessage, longValue);
                if (longValue2 > 0) {
                    verifyMonitoringData(monitoringVerifier, longValue2, arrayList);
                }
                this.kvStore.put(key, monitoringVerifier);
            } catch (AlreadyShutdownException e) {
                VerifyTransformerSupplier.log.error("shutdown already received sequence={} {} store={}", new Object[]{Long.valueOf(monitoringMessage.getSequence()), logKey, this.storeName, e.getMessage()});
                return null;
            } catch (DuplicateSequenceNumberException e2) {
                VerifyTransformerSupplier.log.info("ignoring duplicate sequence={} {} store={}", new Object[]{Long.valueOf(monitoringMessage.getSequence()), logKey, this.storeName});
                return null;
            } catch (SequenceAfterShutdownException e3) {
                VerifyTransformerSupplier.log.error("sequence number higher than shutdown message sequence={} {} store={}", new Object[]{Long.valueOf(monitoringMessage.getSequence()), logKey, this.storeName, e3.getMessage()});
                return null;
            } catch (ShutdownSequenceNotHighestException e4) {
                VerifyTransformerSupplier.log.error("sequence number of shutdown message not highest sequence={} {} store={}", new Object[]{Long.valueOf(monitoringMessage.getSequence()), logKey, this.storeName, e4.getMessage()});
                return null;
            } catch (IllegalStateException e5) {
                VerifyTransformerSupplier.log.info("unexpected arrival of sequence={} {} store={} ise={}", new Object[]{Long.valueOf(monitoringMessage.getSequence()), logKey, this.storeName, e5.getMessage()});
                return null;
            } catch (Exception e6) {
                VerifyTransformerSupplier.log.warn("Failed to update verifier with this message's sequence number. This sequence number maybe declared missing later even thought it is not. Sequence={}.", Long.valueOf(monitoringMessage.getSequence()), e6);
            }
            if (monitoringMessage.getType() == Monitoring.MessageType.NORMAL) {
                arrayList.add(KeyValue.pair((Void) null, Monitoring.MonitoringMessage.newBuilder(monitoringMessage).setArrivalTime(longValue >= 0 ? longValue : 0L).build()));
            }
            if (arrayList.size() == 0) {
                return null;
            }
            return KeyValue.pair((Object) null, arrayList);
        }

        KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>> punctuate(long j) {
            if (this.lastPunctuateTimeMs < 0) {
                this.lastPunctuateTimeMs = j;
            }
            if (j - this.lastPunctuateTimeMs < this.punctuateIntervalMs) {
                return null;
            }
            long currentTimeMillis = VerifyTransformerSupplier.this.clock.currentTimeMillis();
            this.lastPunctuateTimeMs = j;
            long j2 = currentTimeMillis + this.maxPunctuateMs;
            ArrayList arrayList = new ArrayList();
            try {
                KeyValueIterator rangeFrom = KvQuery.rangeFrom(this.kvStore, this.lastPunctuateKey);
                Throwable th = null;
                long j3 = 0;
                while (rangeFrom.hasNext()) {
                    try {
                        try {
                            KeyValue keyValue = (KeyValue) rangeFrom.next();
                            if (keyValue != null) {
                                if (keyValue.value == null) {
                                    VerifyTransformerSupplier.log.debug("removing null value key={} from store={}", keyValue.key, this.storeName);
                                    this.kvStore.delete(keyValue.key);
                                } else {
                                    VerifyTransformerSupplier.log.trace("checking key={} verifier={} store={}", new Object[]{keyValue.key, keyValue.value, this.storeName});
                                    verifyMonitoringData((MonitoringVerifier) keyValue.value, j, arrayList);
                                    if (!((MonitoringVerifier) keyValue.value).isSessionNotActive() || j - ((MonitoringVerifier) keyValue.value).lastActivityTimeMs() <= this.storeRetainMs) {
                                        this.kvStore.put(keyValue.key, keyValue.value);
                                    } else {
                                        VerifyTransformerSupplier.log.debug("removing verifier key={} lastActivity={} from store={}", new Object[]{keyValue.key, Long.valueOf(((MonitoringVerifier) keyValue.value).lastActivityTimeMs()), this.storeName});
                                        this.kvStore.delete(keyValue.key);
                                    }
                                }
                                this.lastPunctuateKey = (Bytes) keyValue.key;
                                j3++;
                                if (arrayList.size() > this.maxPunctuateItems || VerifyTransformerSupplier.this.clock.currentTimeMillis() > j2) {
                                    VerifyTransformerSupplier.log.debug("stopping punctuate at key={} store={} count={}", new Object[]{this.lastPunctuateKey, this.storeName, Long.valueOf(j3)});
                                    break;
                                }
                                if (!rangeFrom.hasNext()) {
                                    VerifyTransformerSupplier.log.debug("finished all punctuation for store={}", this.storeName);
                                    this.lastPunctuateKey = null;
                                }
                                VerifyTransformerSupplier.log.debug("punctuated on count={} records", Long.valueOf(j3));
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
                if (rangeFrom != null) {
                    if (0 != 0) {
                        try {
                            rangeFrom.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        rangeFrom.close();
                    }
                }
            } catch (Exception e) {
                VerifyTransformerSupplier.log.error("failure during punctuate for store={} e={}", this.storeName, e.getMessage());
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            return KeyValue.pair((Object) null, arrayList);
        }

        public void close() {
            VerifyTransformerSupplier.log.info("closing store={}", this.storeName);
        }

        private boolean updateMonitoringTopicClock(int i, long j) {
            Long l = this.partitionTimestamps.get(i);
            if (l.longValue() >= j) {
                return false;
            }
            long longValue = this.partitionTimestamps.get(this.slowestMonitoringPartition).longValue();
            this.partitionTimestamps.set(i, Long.valueOf(Math.min(l.longValue() == -1 ? j : (l.longValue() + VerifyTransformerSupplier.this.sequenceTimeout) - 1, j)));
            if (i != this.slowestMonitoringPartition) {
                return false;
            }
            long longValue2 = ((Long) Collections.min(this.partitionTimestamps)).longValue();
            if (longValue2 != this.partitionTimestamps.get(this.slowestMonitoringPartition).longValue()) {
                this.slowestMonitoringPartition = this.partitionTimestamps.indexOf(Long.valueOf(longValue2));
            }
            if (longValue == longValue2) {
                return false;
            }
            VerifyTransformerSupplier.log.trace("Moved slowest clock: partition={} timestamp={}", Integer.valueOf(this.slowestMonitoringPartition), Long.valueOf(longValue2));
            return true;
        }

        private void maybePunctuate(Monitoring.MonitoringMessage monitoringMessage, List<KeyValue<Void, Monitoring.MonitoringMessage>> list) {
            KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>> punctuate;
            if (!updateMonitoringTopicClock(monitoringMessage.getMonitoringTopicPartition(), monitoringMessage.getTimestamp()) || (punctuate = punctuate(this.partitionTimestamps.get(this.slowestMonitoringPartition).longValue())) == null) {
                return;
            }
            Iterables.addAll(list, (Iterable) punctuate.value);
        }

        private void verifyMonitoringData(MonitoringVerifier monitoringVerifier, long j, List<KeyValue<Void, Monitoring.MonitoringMessage>> list) {
            RangeSet<Long> missingMonitoringData = monitoringVerifier.getMissingMonitoringData(j);
            if (missingMonitoringData == null || missingMonitoringData.isEmpty()) {
                return;
            }
            VerifyTransformerSupplier.log.warn("missing monitoring data for client={} topic={} partition={} session={} window ranges={} {}", new Object[]{monitoringVerifier.getClientId(), monitoringVerifier.getTopic(), Integer.valueOf(monitoringVerifier.getPartition()), monitoringVerifier.getSession(), missingMonitoringData, monitoringVerifier});
            Iterables.addAll(list, Iterables.transform(VerifyTransformerSupplier.errorMessagesInRange(missingMonitoringData, monitoringVerifier.getCopyOfBaseMonitoringMessage(), j), new Function<Monitoring.MonitoringMessage, KeyValue<Void, Monitoring.MonitoringMessage>>() { // from class: io.confluent.controlcenter.streams.verify.VerifyTransformerSupplier.MonitoringVerifyTransformer.1
                public KeyValue<Void, Monitoring.MonitoringMessage> apply(Monitoring.MonitoringMessage monitoringMessage) {
                    return KeyValue.pair((Object) null, monitoringMessage);
                }
            }));
        }
    }

    public VerifyTransformerSupplier(long j, String str, long j2, long j3, long j4, long j5, String str2, MonitoringHeartbeatSender monitoringHeartbeatSender, Clock clock, OrderedKeyUberSerde<Monitoring.MonitoringMessage> orderedKeyUberSerde) {
        this.sequenceTimeout = j;
        this.storeName = str;
        this.storeRetainMs = j2;
        this.maxPunctuateMs = j3;
        this.maxPunctuateItems = j4;
        this.punctuateIntervalMs = j5;
        this.heartbeatSender = monitoringHeartbeatSender;
        this.sourceTopic = str2;
        this.clock = clock;
        this.keySerde = orderedKeyUberSerde;
    }

    public VerifyTransformerSupplier(String str, String str2, MonitoringHeartbeatSender monitoringHeartbeatSender, Clock clock, OrderedKeyUberSerde<Monitoring.MonitoringMessage> orderedKeyUberSerde) {
        this(TimeBucket.SEQUENCE_TIMEOUT, str, DEFAULT_RETAIN_MS, DEFAULT_MAX_PUNCTUATE_MS, DEFAULT_MAX_PUNCTUATE_ITEMS, PUNCTUATE_INTERVAL_MS, str2, monitoringHeartbeatSender, clock, orderedKeyUberSerde);
    }

    public Transformer<Void, Monitoring.MonitoringMessage, KeyValue<Void, Iterable<KeyValue<Void, Monitoring.MonitoringMessage>>>> get() {
        return new MonitoringVerifyTransformer(this.punctuateIntervalMs, this.maxPunctuateMs, this.maxPunctuateItems, this.storeName, this.storeRetainMs, this.heartbeatSender.numberOfTopicPartitions(this.sourceTopic));
    }

    static Collection<Monitoring.MonitoringMessage> errorMessagesInRange(RangeSet<Long> rangeSet, Monitoring.MonitoringMessage monitoringMessage, long j) {
        if (rangeSet == null || rangeSet.isEmpty()) {
            return Collections.emptySet();
        }
        ImmutableList.Builder builder = ImmutableList.builder();
        RangeSet create = TreeRangeSet.create();
        for (Rollup rollup : Rollup.values()) {
            RangeSet create2 = TreeRangeSet.create();
            for (Range range : create.asRanges()) {
                create2.add(Range.closedOpen(Long.valueOf(rollup.toWindow(((Long) range.lowerEndpoint()).longValue())), Long.valueOf(rollup.toWindow(((Long) range.upperEndpoint()).longValue()) + rollup.getMillis())));
            }
            create = create2;
            for (Range range2 : rangeSet.subRangeSet(Range.closedOpen(Long.valueOf(rollup.toWindow(j - rollup.getRetainMillis())), Long.valueOf(rollup.toWindow(j + rollup.getRetainMillis())))).asRanges()) {
                long window = rollup.toWindow(((Long) range2.lowerEndpoint()).longValue());
                while (true) {
                    long j2 = window;
                    if (j2 < ((Long) range2.upperEndpoint()).longValue()) {
                        if (!create.contains(Long.valueOf(j2))) {
                            Monitoring.MonitoringMessage.Builder newBuilder = Monitoring.MonitoringMessage.newBuilder(monitoringMessage);
                            newBuilder.setType(Monitoring.MessageType.ERROR);
                            newBuilder.setWindow(j2);
                            newBuilder.setArrivalTime(j);
                            builder.add(newBuilder.build());
                            create.add(Range.closedOpen(Long.valueOf(j2), Long.valueOf(j2 + rollup.getMillis())));
                        }
                        window = j2 + rollup.getMillis();
                    }
                }
            }
        }
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String logKey(Monitoring.MonitoringMessage monitoringMessage) {
        return Joiner.on(' ').join("clientId=", monitoringMessage.getClientId(), new Object[]{"topic=", monitoringMessage.getTopic(), "partition=", Integer.valueOf(monitoringMessage.getPartition()), "session=", monitoringMessage.getSession()});
    }
}
