package org.apache.kafka.streams.state.internals;

import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.metrics.Sensors;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredSessionStore.class */
public class MeteredSessionStore<K, V> extends WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<K, V> {
    private final String metricScope;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final Time time;
    private StateSerdes<K, V> serdes;
    private StreamsMetricsImpl metrics;
    private Sensor putTime;
    private Sensor fetchTime;
    private Sensor flushTime;
    private Sensor removeTime;
    private String taskName;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MeteredSessionStore(SessionStore<Bytes, byte[]> sessionStore, String str, Serde<K> serde, Serde<V> serde2, Time time) {
        super(sessionStore);
        this.metricScope = str;
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.time = time;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(processorContext.applicationId(), name()), this.keySerde == null ? processorContext.keySerde() : this.keySerde, this.valueSerde == null ? processorContext.valueSerde() : this.valueSerde);
        this.metrics = (StreamsMetricsImpl) processorContext.metrics();
        this.taskName = processorContext.taskId().toString();
        String str = "stream-" + this.metricScope + "-metrics";
        Map<String, String> tagMap = this.metrics.tagMap("task-id", this.taskName, this.metricScope + "-id", StreamsConfig.OPTIMIZE);
        Map<String, String> tagMap2 = this.metrics.tagMap("task-id", this.taskName, this.metricScope + "-id", name());
        this.putTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "put", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        this.fetchTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "fetch", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        this.flushTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "flush", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        this.removeTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "remove", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        Sensor createTaskAndStoreLatencyAndThroughputSensors = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "restore", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        long nanoseconds = this.time.nanoseconds();
        try {
            super.init(processorContext, stateStore);
            this.metrics.recordLatency(createTaskAndStoreLatencyAndThroughputSensors, nanoseconds, this.time.nanoseconds());
        } catch (Throwable th) {
            this.metrics.recordLatency(createTaskAndStoreLatencyAndThroughputSensors, nanoseconds, this.time.nanoseconds());
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        super.close();
        this.metrics.removeAllStoreLevelSensors(this.taskName, name());
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<K>, V> findSessions(K k, long j, long j2) {
        Objects.requireNonNull(k, "key cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().findSessions(keyBytes(k), j, j2), this.fetchTime, this.metrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<K>, V> findSessions(K k, K k2, long j, long j2) {
        Objects.requireNonNull(k, "keyFrom cannot be null");
        Objects.requireNonNull(k2, "keyTo cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().findSessions(keyBytes(k), keyBytes(k2), j, j2), this.fetchTime, this.metrics, this.serdes, this.time);
    }

    /* JADX WARN: Type inference failed for: r11v0, types: [java.lang.Throwable, org.apache.kafka.streams.errors.ProcessorStateException] */
    @Override // org.apache.kafka.streams.state.SessionStore
    public void remove(Windowed<K> windowed) {
        Objects.requireNonNull(windowed, "sessionKey can't be null");
        long nanoseconds = this.time.nanoseconds();
        try {
            try {
                wrapped().remove(new Windowed<>(keyBytes(windowed.key()), windowed.window()));
                this.metrics.recordLatency(this.removeTime, nanoseconds, this.time.nanoseconds());
            } catch (ProcessorStateException e) {
                throw new ProcessorStateException(String.format(e.getMessage(), windowed.key()), e);
            }
        } catch (Throwable th) {
            this.metrics.recordLatency(this.removeTime, nanoseconds, this.time.nanoseconds());
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r12v0, types: [java.lang.Throwable, org.apache.kafka.streams.errors.ProcessorStateException] */
    @Override // org.apache.kafka.streams.state.SessionStore
    public void put(Windowed<K> windowed, V v) {
        Objects.requireNonNull(windowed, "sessionKey can't be null");
        long nanoseconds = this.time.nanoseconds();
        try {
            try {
                wrapped().put(new Windowed<>(keyBytes(windowed.key()), windowed.window()), this.serdes.rawValue(v));
                this.metrics.recordLatency(this.putTime, nanoseconds, this.time.nanoseconds());
            } catch (ProcessorStateException e) {
                throw new ProcessorStateException(String.format(e.getMessage(), windowed.key(), v), e);
            }
        } catch (Throwable th) {
            this.metrics.recordLatency(this.putTime, nanoseconds, this.time.nanoseconds());
            throw th;
        }
    }

    private Bytes keyBytes(K k) {
        return Bytes.wrap(this.serdes.rawKey(k));
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public V fetchSession(K k, long j, long j2) {
        Objects.requireNonNull(k, "key cannot be null");
        Bytes keyBytes = keyBytes(k);
        long nanoseconds = this.time.nanoseconds();
        try {
            V valueFrom = this.serdes.valueFrom(wrapped().fetchSession(keyBytes, j, j2));
            this.metrics.recordLatency(this.flushTime, nanoseconds, this.time.nanoseconds());
            return valueFrom;
        } catch (Throwable th) {
            this.metrics.recordLatency(this.flushTime, nanoseconds, this.time.nanoseconds());
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> fetch(K k) {
        Objects.requireNonNull(k, "key cannot be null");
        return findSessions(k, 0L, Long.MAX_VALUE);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> fetch(K k, K k2) {
        Objects.requireNonNull(k, "from cannot be null");
        Objects.requireNonNull(k2, "to cannot be null");
        return findSessions(k, k2, 0L, Long.MAX_VALUE);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        long nanoseconds = this.time.nanoseconds();
        try {
            super.flush();
        } finally {
            this.metrics.recordLatency(this.flushTime, nanoseconds, this.time.nanoseconds());
        }
    }
}
