package io.confluent.ksql.execution.streams.materialization.ks;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
import org.apache.kafka.streams.state.internals.MeteredSessionStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.class */
public final class SessionStoreCacheBypass {
    private static final Field PROVIDER_FIELD;
    private static final Field STORE_NAME_FIELD;
    private static final Field STORE_TYPE_FIELD;
    static final Field SERDES_FIELD;
    private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass$DeserializingIterator.class */
    public static final class DeserializingIterator implements KeyValueIterator<Windowed<GenericKey>, GenericRow> {
        private final KeyValueIterator<Windowed<Bytes>, byte[]> fetch;
        private final StateSerdes<GenericKey, GenericRow> serdes;

        private DeserializingIterator(KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator, StateSerdes<GenericKey, GenericRow> stateSerdes) {
            this.fetch = keyValueIterator;
            this.serdes = stateSerdes;
        }

        public void close() {
            this.fetch.close();
        }

        /* renamed from: peekNextKey, reason: merged with bridge method [inline-methods] */
        public Windowed<GenericKey> m15peekNextKey() {
            Windowed windowed = (Windowed) this.fetch.peekNextKey();
            return new Windowed<>(this.serdes.keyFrom(((Bytes) windowed.key()).get()), windowed.window());
        }

        public boolean hasNext() {
            return this.fetch.hasNext();
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public KeyValue<Windowed<GenericKey>, GenericRow> m16next() {
            return KeyValue.pair(m15peekNextKey(), this.serdes.valueFrom((byte[]) ((KeyValue) this.fetch.next()).value));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass$EmptyKeyValueIterator.class */
    public static class EmptyKeyValueIterator implements KeyValueIterator<Windowed<GenericKey>, GenericRow> {
        private EmptyKeyValueIterator() {
        }

        public void close() {
        }

        /* renamed from: peekNextKey, reason: merged with bridge method [inline-methods] */
        public Windowed<GenericKey> m17peekNextKey() {
            throw new NoSuchElementException();
        }

        public boolean hasNext() {
            return false;
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public KeyValue<Windowed<GenericKey>, GenericRow> m18next() {
            throw new NoSuchElementException();
        }
    }

    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass$SessionStoreCacheBypassFetcher.class */
    interface SessionStoreCacheBypassFetcher {
        KeyValueIterator<Windowed<GenericKey>, GenericRow> fetch(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore, GenericKey genericKey);
    }

    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass$SessionStoreCacheBypassFetcherRange.class */
    interface SessionStoreCacheBypassFetcherRange {
        KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRange(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore, GenericKey genericKey, GenericKey genericKey2);
    }

    private SessionStoreCacheBypass() {
    }

    public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetch(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore, GenericKey genericKey) {
        Objects.requireNonNull(genericKey, "key can't be null");
        return findFirstNonEmptyIterator(getStores(readOnlySessionStore), readOnlySessionStore2 -> {
            return fetchUncached(readOnlySessionStore2, genericKey);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchUncached(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore, GenericKey genericKey) {
        if (!(readOnlySessionStore instanceof MeteredSessionStore)) {
            throw new IllegalStateException("Expecting a MeteredSessionStore");
        }
        StateSerdes<GenericKey, GenericRow> serdes = getSerdes(readOnlySessionStore);
        return new DeserializingIterator(getInnermostStore(readOnlySessionStore).fetch(Bytes.wrap(serdes.rawKey(genericKey))), serdes);
    }

    public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRange(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore, GenericKey genericKey, GenericKey genericKey2) {
        Objects.requireNonNull(genericKey, "lower key can't be null");
        Objects.requireNonNull(genericKey2, "upper key can't be null");
        return findFirstNonEmptyIterator(getStores(readOnlySessionStore), readOnlySessionStore2 -> {
            return fetchRangeUncached(readOnlySessionStore2, genericKey, genericKey2);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRangeUncached(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore, GenericKey genericKey, GenericKey genericKey2) {
        if (!(readOnlySessionStore instanceof MeteredSessionStore)) {
            throw new IllegalStateException("Expecting a MeteredSessionStore");
        }
        StateSerdes<GenericKey, GenericRow> serdes = getSerdes(readOnlySessionStore);
        return new DeserializingIterator(getInnermostStore(readOnlySessionStore).fetch(Bytes.wrap(serdes.rawKey(genericKey)), Bytes.wrap(serdes.rawKey(genericKey2))), serdes);
    }

    private static KeyValueIterator<Windowed<GenericKey>, GenericRow> findFirstNonEmptyIterator(List<ReadOnlySessionStore<GenericKey, GenericRow>> list, Function<ReadOnlySessionStore<GenericKey, GenericRow>, KeyValueIterator<Windowed<GenericKey>, GenericRow>> function) {
        Iterator<ReadOnlySessionStore<GenericKey, GenericRow>> it = list.iterator();
        while (it.hasNext()) {
            try {
                KeyValueIterator<Windowed<GenericKey>, GenericRow> apply = function.apply(it.next());
                if (apply.hasNext()) {
                    return apply;
                }
                apply.close();
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e);
            }
        }
        return new EmptyKeyValueIterator();
    }

    private static StateSerdes<GenericKey, GenericRow> getSerdes(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore) throws RuntimeException {
        try {
            return (StateSerdes) SERDES_FIELD.get(readOnlySessionStore);
        } catch (IllegalAccessException e) {
            throw new RuntimeException("Stream internals changed unexpectedly!", e);
        }
    }

    private static SessionStore<Bytes, byte[]> getInnermostStore(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore) {
        WrappedStateStore wrappedStateStore;
        StateStore wrapped = ((MeteredSessionStore) readOnlySessionStore).wrapped();
        while (true) {
            wrappedStateStore = (SessionStore) wrapped;
            if (!(wrappedStateStore instanceof WrappedStateStore)) {
                break;
            }
            StateStore wrapped2 = wrappedStateStore.wrapped();
            if (!(wrapped2 instanceof SessionStore)) {
                break;
            }
            wrapped = wrapped2;
        }
        return wrappedStateStore;
    }

    private static List<ReadOnlySessionStore<GenericKey, GenericRow>> getStores(ReadOnlySessionStore<GenericKey, GenericRow> readOnlySessionStore) {
        try {
            return ((StateStoreProvider) PROVIDER_FIELD.get(readOnlySessionStore)).stores((String) STORE_NAME_FIELD.get(readOnlySessionStore), (QueryableStoreType) STORE_TYPE_FIELD.get(readOnlySessionStore));
        } catch (IllegalAccessException e) {
            throw new RuntimeException("Stream internals changed unexpectedly!", e);
        }
    }

    static {
        try {
            PROVIDER_FIELD = CompositeReadOnlySessionStore.class.getDeclaredField("storeProvider");
            PROVIDER_FIELD.setAccessible(true);
            STORE_NAME_FIELD = CompositeReadOnlySessionStore.class.getDeclaredField("storeName");
            STORE_NAME_FIELD.setAccessible(true);
            STORE_TYPE_FIELD = CompositeReadOnlySessionStore.class.getDeclaredField("queryableStoreType");
            STORE_TYPE_FIELD.setAccessible(true);
            SERDES_FIELD = MeteredSessionStore.class.getDeclaredField("serdes");
            SERDES_FIELD.setAccessible(true);
        } catch (NoSuchFieldException e) {
            throw new RuntimeException("Stream internals changed unexpectedly!", e);
        }
    }
}
