package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.RecyclableArrayList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:io/reactivex/netty/channel/BackpressureManagingHandler.class */
public abstract class BackpressureManagingHandler extends ChannelDuplexHandler {
    private static final Logger logger = LoggerFactory.getLogger(BackpressureManagingHandler.class);
    private RecyclableArrayList buffer;
    private int currentBufferIndex;
    private State currentState = State.Buffering;
    private boolean continueDraining;
    private final BytesWriteInterceptor bytesWriteInterceptor;

    /* loaded from: input_file:io/reactivex/netty/channel/BackpressureManagingHandler$BytesWriteInterceptor.class */
    static final class BytesWriteInterceptor extends ChannelDuplexHandler implements Runnable {
        static final String WRITE_INSPECTOR_HANDLER_NAME = "write-inspector";
        static final int MAX_PER_SUBSCRIBER_REQUEST = 64;
        private final String parentHandlerName;
        private boolean messageReceived;
        private Channel channel;
        private boolean removeTaskScheduled;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final ConcurrentLinkedQueue<WriteStreamSubscriber> subscribers = new ConcurrentLinkedQueue<>();
        private int perSubscriberMaxRequest = MAX_PER_SUBSCRIBER_REQUEST;

        BytesWriteInterceptor(String str) {
            this.parentHandlerName = str;
        }

        public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
            channelHandlerContext.write(obj, channelPromise);
            this.messageReceived = true;
            requestMoreIfWritable(channelHandlerContext.channel());
        }

        public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.channel = channelHandlerContext.channel();
            WriteInspector writeInspector = new WriteInspector(this);
            if (null != channelHandlerContext.pipeline().get(this.parentHandlerName)) {
                channelHandlerContext.pipeline().addBefore(this.parentHandlerName, WRITE_INSPECTOR_HANDLER_NAME, writeInspector);
            }
        }

        public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
            if (channelHandlerContext.channel().isWritable()) {
                requestMoreIfWritable(channelHandlerContext.channel());
            }
            super.channelWritabilityChanged(channelHandlerContext);
        }

        public WriteStreamSubscriber newSubscriber(final ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) {
            int size = this.subscribers.size();
            recalculateMaxPerSubscriber(size, size + 1);
            WriteStreamSubscriber writeStreamSubscriber = new WriteStreamSubscriber(channelHandlerContext, channelPromise, this.perSubscriberMaxRequest);
            writeStreamSubscriber.add(Subscriptions.create(new Action0() { // from class: io.reactivex.netty.channel.BackpressureManagingHandler.BytesWriteInterceptor.1
                public void call() {
                    boolean z;
                    synchronized (BytesWriteInterceptor.this) {
                        z = !BytesWriteInterceptor.this.removeTaskScheduled;
                        BytesWriteInterceptor.this.removeTaskScheduled = true;
                    }
                    if (z) {
                        channelHandlerContext.channel().eventLoop().execute(BytesWriteInterceptor.this);
                    }
                }
            }));
            this.subscribers.add(writeStreamSubscriber);
            return writeStreamSubscriber;
        }

        List<WriteStreamSubscriber> getSubscribers() {
            return Collections.unmodifiableList(new ArrayList(this.subscribers));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void requestMoreIfWritable(Channel channel) {
            if (!$assertionsDisabled && !channel.eventLoop().inEventLoop()) {
                throw new AssertionError();
            }
            Iterator<WriteStreamSubscriber> it = this.subscribers.iterator();
            while (it.hasNext()) {
                WriteStreamSubscriber next = it.next();
                if (!next.isUnsubscribed() && channel.isWritable()) {
                    next.requestMoreIfNeeded(this.perSubscriberMaxRequest);
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this) {
                this.removeTaskScheduled = false;
            }
            int size = this.subscribers.size();
            Iterator<WriteStreamSubscriber> it = this.subscribers.iterator();
            while (it.hasNext()) {
                if (it.next().isUnsubscribed()) {
                    it.remove();
                }
            }
            recalculateMaxPerSubscriber(size, this.subscribers.size());
        }

        private void recalculateMaxPerSubscriber(int i, int i2) {
            if (!$assertionsDisabled && !this.channel.eventLoop().inEventLoop()) {
                throw new AssertionError();
            }
            this.perSubscriberMaxRequest = (i2 == 0 || i == 0) ? MAX_PER_SUBSCRIBER_REQUEST : (this.perSubscriberMaxRequest * i) / i2;
            this.perSubscriberMaxRequest = Math.max(1, this.perSubscriberMaxRequest);
            if (BackpressureManagingHandler.logger.isDebugEnabled()) {
                BackpressureManagingHandler.logger.debug("Channel {}. Modifying per subscriber max request. Old subscribers count {}, new subscribers count {}. New Value {} ", new Object[]{this.channel, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.perSubscriberMaxRequest)});
            }
        }

        static {
            $assertionsDisabled = !BackpressureManagingHandler.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:io/reactivex/netty/channel/BackpressureManagingHandler$RequestReadIfRequiredEvent.class */
    protected static abstract class RequestReadIfRequiredEvent {
        protected abstract boolean shouldReadMore(ChannelHandlerContext channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/reactivex/netty/channel/BackpressureManagingHandler$State.class */
    public enum State {
        ReadRequested,
        Reading,
        Buffering,
        DrainingBuffer,
        Stopped
    }

    /* loaded from: input_file:io/reactivex/netty/channel/BackpressureManagingHandler$WriteInspector.class */
    static final class WriteInspector extends ChannelDuplexHandler {
        private final BytesWriteInterceptor bytesWriteInterceptor;

        WriteInspector(BytesWriteInterceptor bytesWriteInterceptor) {
            this.bytesWriteInterceptor = bytesWriteInterceptor;
        }

        public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
            this.bytesWriteInterceptor.messageReceived = false;
            channelHandlerContext.write(obj, channelPromise);
            if (this.bytesWriteInterceptor.messageReceived) {
                return;
            }
            this.bytesWriteInterceptor.requestMoreIfWritable(channelHandlerContext.channel());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/reactivex/netty/channel/BackpressureManagingHandler$WriteStreamSubscriber.class */
    public static class WriteStreamSubscriber extends Subscriber<Object> {
        private final ChannelHandlerContext ctx;
        private final ChannelPromise overarchingWritePromise;
        private final int initialRequest;
        private long maxBufferSize;
        private long pending;
        private long lowWaterMark;
        private final Object guard = new Object();
        private boolean isDone;
        private Scheduler.Worker writeWorker;
        private boolean atleastOneWriteEnqueued;
        private boolean isPromiseCompletedOnWriteComplete;
        private int listeningTo;

        WriteStreamSubscriber(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise, int i) {
            this.ctx = channelHandlerContext;
            this.overarchingWritePromise = channelPromise;
            this.initialRequest = i;
            channelPromise.addListener(new ChannelFutureListener() { // from class: io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isCancelled()) {
                        WriteStreamSubscriber.this.unsubscribe();
                    }
                }
            });
        }

        public void onStart() {
            requestMoreIfNeeded(this.initialRequest);
        }

        public void onCompleted() {
            onTermination(null);
        }

        public void onError(Throwable th) {
            onTermination(th);
        }

        public void onNext(Object obj) {
            boolean z;
            boolean inEventLoop = this.ctx.channel().eventLoop().inEventLoop();
            synchronized (this.guard) {
                this.pending--;
                if (null == this.writeWorker) {
                    if (!inEventLoop) {
                        this.atleastOneWriteEnqueued = true;
                    }
                    if (this.atleastOneWriteEnqueued) {
                        this.writeWorker = Schedulers.computation().createWorker();
                    }
                }
                z = null != this.writeWorker && inEventLoop;
            }
            ChannelFuture enqueueWrite = z ? enqueueWrite(obj) : this.ctx.write(obj);
            synchronized (this.guard) {
                this.listeningTo++;
            }
            enqueueWrite.addListener(new ChannelFutureListener() { // from class: io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber.2
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    boolean z2;
                    if (WriteStreamSubscriber.this.overarchingWritePromise.isDone()) {
                        return;
                    }
                    synchronized (WriteStreamSubscriber.this.guard) {
                        WriteStreamSubscriber.access$610(WriteStreamSubscriber.this);
                        if (0 == WriteStreamSubscriber.this.listeningTo && WriteStreamSubscriber.this.isDone) {
                            WriteStreamSubscriber.this.isPromiseCompletedOnWriteComplete = true;
                        }
                        z2 = WriteStreamSubscriber.this.isPromiseCompletedOnWriteComplete;
                    }
                    if (!channelFuture.isSuccess()) {
                        WriteStreamSubscriber.this.overarchingWritePromise.tryFailure(channelFuture.cause());
                        WriteStreamSubscriber.this.unsubscribe();
                    } else if (z2) {
                        WriteStreamSubscriber.this.overarchingWritePromise.trySuccess();
                    }
                }
            });
        }

        private ChannelFuture enqueueWrite(final Object obj) {
            final ChannelPromise newPromise = this.ctx.channel().newPromise();
            this.writeWorker.schedule(new Action0() { // from class: io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber.3
                public void call() {
                    WriteStreamSubscriber.this.ctx.write(obj, newPromise);
                }
            });
            return newPromise;
        }

        private void onTermination(Throwable th) {
            boolean z;
            boolean z2;
            synchronized (this.guard) {
                z = this.atleastOneWriteEnqueued;
                this.isDone = true;
                z2 = 0 == this.listeningTo && !this.isPromiseCompletedOnWriteComplete;
            }
            if (z) {
                this.writeWorker.schedule(new Action0() { // from class: io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber.4
                    public void call() {
                        WriteStreamSubscriber.this.ctx.flush();
                    }
                });
            }
            if (null != th) {
                this.overarchingWritePromise.tryFailure(th);
            } else if (z2) {
                this.overarchingWritePromise.trySuccess();
            }
        }

        void requestMoreIfNeeded(long j) {
            long j2 = 0;
            synchronized (this.guard) {
                if (j > this.maxBufferSize) {
                    j2 = j - this.maxBufferSize;
                }
                this.maxBufferSize = j;
                this.lowWaterMark = this.maxBufferSize / 2;
                if (this.pending < this.lowWaterMark) {
                    j2 = this.maxBufferSize - this.pending;
                }
                this.pending += j2;
            }
            if (j2 > 0) {
                request(j2);
            }
        }

        public void subscribeTo(Observable observable) {
            observable.subscribe(this);
        }

        static /* synthetic */ int access$610(WriteStreamSubscriber writeStreamSubscriber) {
            int i = writeStreamSubscriber.listeningTo;
            writeStreamSubscriber.listeningTo = i - 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BackpressureManagingHandler(String str) {
        this.bytesWriteInterceptor = new BytesWriteInterceptor(str);
    }

    public final void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (State.Stopped != this.currentState && !shouldReadMore(channelHandlerContext)) {
            this.currentState = State.Buffering;
        }
        switch (this.currentState) {
            case ReadRequested:
                this.currentState = State.Reading;
                break;
            case Reading:
                break;
            case Buffering:
            case DrainingBuffer:
                if (null == this.buffer) {
                    this.buffer = RecyclableArrayList.newInstance();
                }
                this.buffer.add(obj);
                return;
            case Stopped:
                logger.warn("Message read after handler removed, discarding the same. Message class: " + obj.getClass().getName());
                ReferenceCountUtil.release(obj);
                return;
            default:
                return;
        }
        newMessage(channelHandlerContext, obj);
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.pipeline().addFirst(new ChannelHandler[]{this.bytesWriteInterceptor});
        this.currentState = State.Buffering;
    }

    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.currentState = State.Stopped;
        if (null != this.buffer) {
            if (!this.buffer.isEmpty()) {
                Iterator it = this.buffer.iterator();
                while (it.hasNext()) {
                    ReferenceCountUtil.release(it.next());
                }
            }
            this.buffer.recycle();
            this.buffer = null;
        }
    }

    public final void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        switch (this.currentState) {
            case Reading:
                this.currentState = State.Buffering;
                break;
        }
        channelHandlerContext.fireChannelReadComplete();
        if (channelHandlerContext.channel().config().isAutoRead() || !shouldReadMore(channelHandlerContext)) {
            return;
        }
        read(channelHandlerContext);
    }

    public final void read(ChannelHandlerContext channelHandlerContext) throws Exception {
        switch (this.currentState) {
            case ReadRequested:
                channelHandlerContext.read();
                return;
            case Reading:
            default:
                return;
            case Buffering:
                this.currentState = State.DrainingBuffer;
                this.continueDraining = true;
                while (this.continueDraining && null != this.buffer && this.currentBufferIndex < this.buffer.size()) {
                    RecyclableArrayList recyclableArrayList = this.buffer;
                    int i = this.currentBufferIndex;
                    this.currentBufferIndex = i + 1;
                    newMessage(channelHandlerContext, recyclableArrayList.get(i));
                    this.continueDraining = false;
                    channelReadComplete(channelHandlerContext);
                }
                if (this.continueDraining) {
                    if (null != this.buffer) {
                        recycleBuffer();
                    }
                    this.currentState = State.ReadRequested;
                    channelHandlerContext.read();
                    return;
                }
                this.currentState = State.Buffering;
                if (null == this.buffer || this.currentBufferIndex < this.buffer.size()) {
                    return;
                }
                recycleBuffer();
                return;
            case DrainingBuffer:
                this.continueDraining = true;
                return;
            case Stopped:
                channelHandlerContext.read();
                return;
        }
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (!(obj instanceof Observable)) {
            channelHandlerContext.write(obj, channelPromise);
        } else {
            this.bytesWriteInterceptor.newSubscriber(channelHandlerContext, channelPromise).subscribeTo((Observable) obj);
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof RequestReadIfRequiredEvent) && ((RequestReadIfRequiredEvent) obj).shouldReadMore(channelHandlerContext)) {
            read(channelHandlerContext);
        }
        super.userEventTriggered(channelHandlerContext, obj);
    }

    protected abstract void newMessage(ChannelHandlerContext channelHandlerContext, Object obj);

    protected abstract boolean shouldReadMore(ChannelHandlerContext channelHandlerContext);

    RecyclableArrayList getBuffer() {
        return this.buffer;
    }

    int getCurrentBufferIndex() {
        return this.currentBufferIndex;
    }

    State getCurrentState() {
        return this.currentState;
    }

    private void recycleBuffer() {
        this.buffer.recycle();
        this.currentBufferIndex = 0;
        this.buffer = null;
    }
}
