/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.alert;

import io.confluent.controlcenter.alert.record.Alert;
import io.confluent.controlcenter.streams.internals.KeyValueStoreFacade;
import io.confluent.controlcenter.util.KvQuery;
import io.confluent.serializers.OrderedKeyUberSerde;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlertHistoryProcessorSupplier
implements ProcessorSupplier<Alert.AlertInfo, Alert.AlertInfo> {
    private static final Logger log = LoggerFactory.getLogger(AlertHistoryProcessorSupplier.class);
    private static final long DEFAULT_RETAIN_MS = TimeUnit.DAYS.toMillis(7L);
    private static final long DEFAULT_PUNCTUATE_INTERVAL_MS = TimeUnit.MINUTES.toMillis(10L);
    private static final long MAX_PUNCTUATE_ITEMS = 1000L;
    private final String storeName;
    private final OrderedKeyUberSerde<Alert.AlertInfo> alertKeySerde;
    private final long storeRetainMs;
    private final long punctuateIntervalMs;
    private final long maxPunctuateItems;
    private final OrderedKeyUberSerde<Alert.AlertInfo> alertRangeSerde;

    public AlertHistoryProcessorSupplier(String storeName, long storeRetainMs, long punctuateIntervalMs, long maxPunctuateItems, OrderedKeyUberSerde<Alert.AlertInfo> alertKeySerde, OrderedKeyUberSerde<Alert.AlertInfo> alertRangeSerde) {
        this.storeName = storeName;
        this.storeRetainMs = storeRetainMs;
        this.punctuateIntervalMs = punctuateIntervalMs;
        this.maxPunctuateItems = maxPunctuateItems;
        this.alertKeySerde = alertKeySerde;
        this.alertRangeSerde = alertRangeSerde;
    }

    public AlertHistoryProcessorSupplier(String storeName, OrderedKeyUberSerde<Alert.AlertInfo> alertKeySerde, OrderedKeyUberSerde<Alert.AlertInfo> alertRangeSerde) {
        this(storeName, DEFAULT_RETAIN_MS, DEFAULT_PUNCTUATE_INTERVAL_MS, 1000L, alertKeySerde, alertRangeSerde);
    }

    public Processor<Alert.AlertInfo, Alert.AlertInfo> get() {
        return new AbstractProcessor<Alert.AlertInfo, Alert.AlertInfo>(){
            private KeyValueStore<Bytes, Alert.AlertInfo> kvStore;
            private long lastPunctuateTimeMs = -1L;

            public void init(ProcessorContext context) {
                super.init(context);
                StateStore rawStore = context.getStateStore(AlertHistoryProcessorSupplier.this.storeName);
                this.kvStore = rawStore instanceof TimestampedKeyValueStore ? new KeyValueStoreFacade<Bytes, Alert.AlertInfo>((TimestampedKeyValueStore)rawStore) : (KeyValueStore)rawStore;
            }

            public void process(Alert.AlertInfo key, Alert.AlertInfo value) {
                log.trace("Store alert key={}  value={}", (Object)key, (Object)value);
                this.kvStore.put((Object)AlertHistoryProcessorSupplier.this.alertKeySerde.key((Object)key), (Object)value);
                long timestamp = value.getTimestamp();
                if (this.lastPunctuateTimeMs < 0L) {
                    this.lastPunctuateTimeMs = timestamp;
                }
                if (timestamp - this.lastPunctuateTimeMs < AlertHistoryProcessorSupplier.this.punctuateIntervalMs) {
                    return;
                }
                this.lastPunctuateTimeMs = timestamp;
                Alert.AlertInfo alertInfoStopKey = Alert.AlertInfo.newBuilder().setTimestamp(timestamp - AlertHistoryProcessorSupplier.this.storeRetainMs).build();
                Bytes stop = AlertHistoryProcessorSupplier.this.alertRangeSerde.key((Object)alertInfoStopKey);
                try (KeyValueIterator<Bytes, Alert.AlertInfo> iter = KvQuery.rangeFrom(this.kvStore, stop);){
                    int count = 0;
                    while (iter.hasNext()) {
                        KeyValue entry = (KeyValue)iter.next();
                        log.debug("Removing alert info key={}, timestamp={}, guid={} from store={}", new Object[]{entry.key, ((Alert.AlertInfo)entry.value).getTimestamp(), ((Alert.AlertInfo)entry.value).getGuid(), AlertHistoryProcessorSupplier.this.storeName});
                        this.kvStore.delete(entry.key);
                        if ((long)(++count) >= AlertHistoryProcessorSupplier.this.maxPunctuateItems) {
                            log.debug("stopping punctuate (after removing max items) at key={} store={}", entry.key, (Object)AlertHistoryProcessorSupplier.this.storeName);
                            break;
                        }
                        if (iter.hasNext()) continue;
                        log.debug("finished all punctuation for store={}", (Object)AlertHistoryProcessorSupplier.this.storeName);
                    }
                }
                catch (Exception e) {
                    log.error("Range query failed for removing items from store " + AlertHistoryProcessorSupplier.this.storeName, (Throwable)e);
                }
            }
        };
    }
}

