/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.verify;

import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.record.Monitoring;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThroughTopicVerifierTransformerSupplier
implements ValueTransformerSupplier<Controlcenter.VerifiableMonitoringMessage, Monitoring.MonitoringMessage> {
    private final String storeName;
    private final Clock clock;

    public ThroughTopicVerifierTransformerSupplier(String storeName, Clock clock) {
        this.storeName = storeName;
        this.clock = clock;
    }

    public ValueTransformer<Controlcenter.VerifiableMonitoringMessage, Monitoring.MonitoringMessage> get() {
        return new ThroughTopicVerifierTransformer(this.storeName);
    }

    private class ThroughTopicVerifierTransformer
    implements ValueTransformer<Controlcenter.VerifiableMonitoringMessage, Monitoring.MonitoringMessage> {
        private final Logger log = LoggerFactory.getLogger(ThroughTopicVerifierTransformer.class);
        private final String storeName;
        private ProcessorContext context;
        private KeyValueStore<String, Long> kvStore;

        ThroughTopicVerifierTransformer(String storeName) {
            this.storeName = storeName;
        }

        public void init(ProcessorContext context) {
            this.context = context;
            this.kvStore = (KeyValueStore)this.context.getStateStore(this.storeName);
        }

        public Monitoring.MonitoringMessage transform(Controlcenter.VerifiableMonitoringMessage verifiableMonitoringMessage) {
            if (verifiableMonitoringMessage == null) {
                return null;
            }
            String key = verifiableMonitoringMessage.getGuid();
            Monitoring.MonitoringMessage out = null;
            try {
                Long lastSeen = (Long)this.kvStore.get((Object)key);
                this.kvStore.put((Object)key, (Object)ThroughTopicVerifierTransformerSupplier.this.clock.currentTimeMillis());
                if (lastSeen == null) {
                    out = verifiableMonitoringMessage.getMonitoringMessage();
                } else {
                    this.log.warn("dropping duplicate message key={} lastSeen={} store={}", new Object[]{key, lastSeen, this.storeName});
                }
            }
            catch (Exception e) {
                this.log.warn("failed to update key={} store={} {}", new Object[]{key, this.storeName, e});
            }
            return out;
        }

        public void close() {
            this.log.info("closing store={}", (Object)this.storeName);
        }
    }
}

