package io.lettuce.core;

import io.lettuce.core.api.reactive.RedisHashReactiveCommands;
import io.lettuce.core.api.reactive.RedisKeyReactiveCommands;
import io.lettuce.core.api.reactive.RedisSetReactiveCommands;
import io.lettuce.core.api.reactive.RedisSortedSetReactiveCommands;
import io.lettuce.core.internal.LettuceAssert;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:io/lettuce/core/ScanStream.class */
public abstract class ScanStream {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/lettuce/core/ScanStream$Completable.class */
    public interface Completable {
        void chunkCompleted();

        void onError(Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/lettuce/core/ScanStream$ScanSubscriber.class */
    public static class ScanSubscriber<T, C extends ScanCursor> extends BaseSubscriber<C> {
        private static final AtomicReferenceFieldUpdater<ScanSubscriber, ScanCursor> CURSOR = AtomicReferenceFieldUpdater.newUpdater(ScanSubscriber.class, ScanCursor.class, "cursor");
        private static final AtomicLongFieldUpdater<ScanSubscriber> EMITTED = AtomicLongFieldUpdater.newUpdater(ScanSubscriber.class, "emitted");
        private final Completable completable;
        private final FluxSink<T> sink;
        private final Queue<T> buffer = Operators.newQueue();
        private final Context context;
        private final Function<C, Collection<T>> manyMapper;
        volatile boolean canceled;
        private volatile C cursor;
        private volatile long emitted;
        private volatile long cursorSize;

        ScanSubscriber(Completable completable, FluxSink<T> fluxSink, Context context, Function<C, Collection<T>> function) {
            this.completable = completable;
            this.sink = fluxSink;
            this.context = context;
            this.manyMapper = function;
        }

        public Context currentContext() {
            return this.context;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(C c) {
            if (!CURSOR.compareAndSet(this, null, c)) {
                Operators.onOperatorError(this, new IllegalStateException("Cannot propagate Cursor"), c, this.context);
                return;
            }
            Collection<T> apply = this.manyMapper.apply(c);
            this.cursorSize = apply.size();
            emitDirect(apply);
        }

        void emitDirect(Iterable<T> iterable) {
            long requestedFromDownstream = this.sink.requestedFromDownstream();
            long j = 0;
            for (T t : iterable) {
                if (this.canceled) {
                    return;
                }
                if (requestedFromDownstream <= j) {
                    this.buffer.add(t);
                } else {
                    j++;
                    next(t);
                }
            }
        }

        void emitFromBuffer() {
            long requestedFromDownstream = this.sink.requestedFromDownstream();
            long j = 0;
            if (requestedFromDownstream <= 0) {
                return;
            }
            do {
                T poll = this.buffer.poll();
                if (poll == null || this.canceled) {
                    return;
                }
                j++;
                next(poll);
            } while (requestedFromDownstream > j);
        }

        private void next(T t) {
            EMITTED.incrementAndGet(this);
            this.sink.next(t);
        }

        protected void hookOnComplete() {
            this.completable.chunkCompleted();
        }

        protected void hookOnError(Throwable th) {
            this.completable.onError(th);
        }

        protected void hookOnCancel() {
            this.canceled = true;
        }

        public ScanCursor getCursor() {
            return CURSOR.get(this);
        }

        public boolean isExhausted() {
            return EMITTED.get(this) == this.cursorSize && getCursor() != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/lettuce/core/ScanStream$SubscriptionAdapter.class */
    public static class SubscriptionAdapter<T, C extends ScanCursor> implements Completable {
        private static final AtomicReferenceFieldUpdater<SubscriptionAdapter, ScanSubscriber> SUBSCRIBER = AtomicReferenceFieldUpdater.newUpdater(SubscriptionAdapter.class, ScanSubscriber.class, "currentSubscription");
        private static final AtomicIntegerFieldUpdater<SubscriptionAdapter> STATUS = AtomicIntegerFieldUpdater.newUpdater(SubscriptionAdapter.class, "status");
        private static final int STATUS_ACTIVE = 0;
        private static final int STATUS_TERMINATED = 0;
        private volatile ScanSubscriber<T, C> currentSubscription;
        private volatile boolean canceled;
        private volatile int status = 0;
        private final FluxSink<T> sink;
        private final Context context;
        private final Mono<C> initial;
        private final Function<ScanCursor, Mono<C>> scanFunction;
        private final Function<C, Collection<T>> manyMapper;

        SubscriptionAdapter(FluxSink<T> fluxSink, Mono<C> mono, Function<ScanCursor, Mono<C>> function, Function<C, Collection<T>> function2) {
            this.sink = fluxSink;
            this.context = fluxSink.currentContext();
            this.initial = mono;
            this.scanFunction = function;
            this.manyMapper = function2;
        }

        public void register() {
            this.sink.onRequest(this::onDemand);
            this.sink.onCancel(this::canceled);
        }

        void onDemand(long j) {
            if (this.canceled) {
                return;
            }
            ScanSubscriber<T, C> currentSubscriber = getCurrentSubscriber();
            if (currentSubscriber == null) {
                ScanSubscriber scanSubscriber = new ScanSubscriber(this, this.sink, this.context, this.manyMapper);
                if (SUBSCRIBER.compareAndSet(this, null, scanSubscriber)) {
                    this.initial.subscribe(scanSubscriber);
                    return;
                }
                return;
            }
            ScanCursor cursor = currentSubscriber.getCursor();
            if (cursor == null) {
                return;
            }
            currentSubscriber.emitFromBuffer();
            if (!currentSubscriber.isExhausted() || currentSubscriber.canceled || this.sink.requestedFromDownstream() == 0) {
                return;
            }
            if (cursor.isFinished()) {
                chunkCompleted();
                return;
            }
            Mono<C> apply = this.scanFunction.apply(cursor);
            ScanSubscriber scanSubscriber2 = new ScanSubscriber(this, this.sink, this.context, this.manyMapper);
            if (SUBSCRIBER.compareAndSet(this, currentSubscriber, scanSubscriber2)) {
                apply.subscribe(scanSubscriber2);
            }
        }

        private void canceled() {
            this.canceled = true;
            ScanSubscriber<T, C> currentSubscriber = getCurrentSubscriber();
            if (currentSubscriber != null) {
                currentSubscriber.cancel();
            }
        }

        @Override // io.lettuce.core.ScanStream.Completable
        public void chunkCompleted() {
            ScanSubscriber<T, C> currentSubscriber;
            ScanCursor cursor;
            if (this.canceled || (currentSubscriber = getCurrentSubscriber()) == null || (cursor = currentSubscriber.getCursor()) == null) {
                return;
            }
            if (!cursor.isFinished() || !currentSubscriber.isExhausted()) {
                onDemand(0L);
            } else if (terminate()) {
                this.sink.complete();
            }
        }

        ScanSubscriber<T, C> getCurrentSubscriber() {
            return SUBSCRIBER.get(this);
        }

        @Override // io.lettuce.core.ScanStream.Completable
        public void onError(Throwable th) {
            if (this.canceled || !terminate()) {
                return;
            }
            this.sink.error(th);
        }

        protected boolean terminate() {
            return STATUS.compareAndSet(this, 0, 0);
        }
    }

    private ScanStream() {
    }

    public static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> redisKeyReactiveCommands) {
        return scan(redisKeyReactiveCommands, (Optional<ScanArgs>) Optional.empty());
    }

    public static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> redisKeyReactiveCommands, ScanArgs scanArgs) {
        LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");
        return scan(redisKeyReactiveCommands, (Optional<ScanArgs>) Optional.of(scanArgs));
    }

    private static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> redisKeyReactiveCommands, Optional<ScanArgs> optional) {
        LettuceAssert.notNull(redisKeyReactiveCommands, "RedisKeyCommands must not be null");
        return Flux.create(fluxSink -> {
            redisKeyReactiveCommands.getClass();
            Optional map = optional.map(redisKeyReactiveCommands::scan);
            redisKeyReactiveCommands.getClass();
            scan(fluxSink, (Mono) map.orElseGet(redisKeyReactiveCommands::scan), scanCursor -> {
                return (Mono) optional.map(scanArgs -> {
                    return redisKeyReactiveCommands.scan(scanCursor, scanArgs);
                }).orElseGet(() -> {
                    return redisKeyReactiveCommands.scan(scanCursor);
                });
            }, (v0) -> {
                return v0.getKeys();
            });
        });
    }

    public static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> redisHashReactiveCommands, K k) {
        return hscan(redisHashReactiveCommands, k, (Optional<ScanArgs>) Optional.empty());
    }

    public static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> redisHashReactiveCommands, K k, ScanArgs scanArgs) {
        LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");
        return hscan(redisHashReactiveCommands, k, (Optional<ScanArgs>) Optional.of(scanArgs));
    }

    private static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> redisHashReactiveCommands, K k, Optional<ScanArgs> optional) {
        LettuceAssert.notNull(redisHashReactiveCommands, "RedisHashReactiveCommands must not be null");
        LettuceAssert.notNull(k, "Key must not be null");
        return Flux.create(fluxSink -> {
            scan(fluxSink, (Mono) optional.map(scanArgs -> {
                return redisHashReactiveCommands.hscan((RedisHashReactiveCommands) k, scanArgs);
            }).orElseGet(() -> {
                return redisHashReactiveCommands.hscan(k);
            }), scanCursor -> {
                return (Mono) optional.map(scanArgs2 -> {
                    return redisHashReactiveCommands.hscan((RedisHashReactiveCommands) k, scanCursor, scanArgs2);
                }).orElseGet(() -> {
                    return redisHashReactiveCommands.hscan((RedisHashReactiveCommands) k, scanCursor);
                });
            }, mapScanCursor -> {
                ArrayList arrayList = new ArrayList(mapScanCursor.getMap().size());
                for (Map.Entry entry : mapScanCursor.getMap().entrySet()) {
                    arrayList.add(KeyValue.fromNullable(entry.getKey(), entry.getValue()));
                }
                return arrayList;
            });
        });
    }

    public static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> redisSetReactiveCommands, K k) {
        return sscan(redisSetReactiveCommands, k, (Optional<ScanArgs>) Optional.empty());
    }

    public static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> redisSetReactiveCommands, K k, ScanArgs scanArgs) {
        LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");
        return sscan(redisSetReactiveCommands, k, (Optional<ScanArgs>) Optional.of(scanArgs));
    }

    private static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> redisSetReactiveCommands, K k, Optional<ScanArgs> optional) {
        LettuceAssert.notNull(redisSetReactiveCommands, "RedisSetReactiveCommands must not be null");
        LettuceAssert.notNull(k, "Key must not be null");
        return Flux.create(fluxSink -> {
            scan(fluxSink, (Mono) optional.map(scanArgs -> {
                return redisSetReactiveCommands.sscan((RedisSetReactiveCommands) k, scanArgs);
            }).orElseGet(() -> {
                return redisSetReactiveCommands.sscan(k);
            }), scanCursor -> {
                return (Mono) optional.map(scanArgs2 -> {
                    return redisSetReactiveCommands.sscan((RedisSetReactiveCommands) k, scanCursor, scanArgs2);
                }).orElseGet(() -> {
                    return redisSetReactiveCommands.sscan((RedisSetReactiveCommands) k, scanCursor);
                });
            }, (v0) -> {
                return v0.getValues();
            });
        });
    }

    public static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> redisSortedSetReactiveCommands, K k) {
        return zscan(redisSortedSetReactiveCommands, k, (Optional<ScanArgs>) Optional.empty());
    }

    public static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> redisSortedSetReactiveCommands, K k, ScanArgs scanArgs) {
        LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");
        return zscan(redisSortedSetReactiveCommands, k, (Optional<ScanArgs>) Optional.of(scanArgs));
    }

    private static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> redisSortedSetReactiveCommands, K k, Optional<ScanArgs> optional) {
        LettuceAssert.notNull(redisSortedSetReactiveCommands, "RedisSortedSetReactiveCommands must not be null");
        LettuceAssert.notNull(k, "Key must not be null");
        return Flux.create(fluxSink -> {
            scan(fluxSink, (Mono) optional.map(scanArgs -> {
                return redisSortedSetReactiveCommands.zscan((RedisSortedSetReactiveCommands) k, scanArgs);
            }).orElseGet(() -> {
                return redisSortedSetReactiveCommands.zscan(k);
            }), scanCursor -> {
                return (Mono) optional.map(scanArgs2 -> {
                    return redisSortedSetReactiveCommands.zscan((RedisSortedSetReactiveCommands) k, scanCursor, scanArgs2);
                }).orElseGet(() -> {
                    return redisSortedSetReactiveCommands.zscan((RedisSortedSetReactiveCommands) k, scanCursor);
                });
            }, (v0) -> {
                return v0.getValues();
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <V, C extends ScanCursor> void scan(FluxSink<V> fluxSink, Mono<C> mono, Function<ScanCursor, Mono<C>> function, Function<C, Collection<V>> function2) {
        new SubscriptionAdapter(fluxSink, mono, function, function2).register();
    }
}
