package io.vertx.rx.java.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.streams.WriteStream;
import io.vertx.rx.java.WriteStreamSubscriber;
import java.util.Objects;
import java.util.function.Function;
import rx.functions.Action0;
import rx.functions.Action1;

/* loaded from: input_file:io/vertx/rx/java/impl/WriteStreamSubscriberImpl.class */
public class WriteStreamSubscriberImpl<R, T> extends WriteStreamSubscriber<R> {
    private static final int BATCH_SIZE = 16;
    private final WriteStream<T> writeStream;
    private final Function<R, T> mapping;
    private int outstanding;
    private Action1<Throwable> observableErrorHandler;
    private Action0 observableCompleteHandler;
    private Action1<Throwable> writeStreamExceptionHandler;
    private Action0 writeStreamEndHandler;
    private Action1<Throwable> writeStreamEndErrorHandler;

    public WriteStreamSubscriberImpl(WriteStream<T> writeStream, Function<R, T> function) {
        Objects.requireNonNull(writeStream, "writeStream");
        Objects.requireNonNull(function, "mapping");
        this.writeStream = writeStream;
        this.mapping = function;
    }

    public void onStart() {
        this.writeStream.exceptionHandler(th -> {
            Action1<Throwable> action1;
            unsubscribe();
            synchronized (this) {
                action1 = this.writeStreamExceptionHandler;
            }
            if (action1 != null) {
                action1.call(th);
            }
        });
        this.writeStream.drainHandler(r3 -> {
            requestMore();
        });
        requestMore();
    }

    public void onNext(R r) {
        this.writeStream.write(this.mapping.apply(r));
        synchronized (this) {
            this.outstanding--;
        }
        if (this.writeStream.writeQueueFull()) {
            return;
        }
        requestMore();
    }

    public void onError(Throwable th) {
        Action1<Throwable> action1;
        synchronized (this) {
            action1 = this.observableErrorHandler;
        }
        if (action1 != null) {
            action1.call(th);
        }
    }

    public void onCompleted() {
        Action0 action0;
        synchronized (this) {
            action0 = this.observableCompleteHandler;
        }
        this.writeStream.end(this::writeStreamEnd);
        if (action0 != null) {
            action0.call();
        }
    }

    private void writeStreamEnd(AsyncResult<Void> asyncResult) {
        Action1<Throwable> action1;
        Action0 action0;
        if (asyncResult.succeeded()) {
            synchronized (this) {
                action0 = this.writeStreamEndHandler;
            }
            if (action0 != null) {
                action0.call();
                return;
            }
            return;
        }
        synchronized (this) {
            action1 = this.writeStreamEndErrorHandler;
        }
        if (action1 != null) {
            action1.call(asyncResult.cause());
        }
    }

    private void requestMore() {
        synchronized (this) {
            if (this.outstanding > 0) {
                return;
            }
            this.outstanding = 16;
            request(16L);
        }
    }

    @Override // io.vertx.rx.java.WriteStreamSubscriber
    public synchronized WriteStreamSubscriber<R> onError(Action1<Throwable> action1) {
        this.observableErrorHandler = action1;
        return this;
    }

    @Override // io.vertx.rx.java.WriteStreamSubscriber
    public synchronized WriteStreamSubscriber<R> onComplete(Action0 action0) {
        this.observableCompleteHandler = action0;
        return this;
    }

    @Override // io.vertx.rx.java.WriteStreamSubscriber
    public synchronized WriteStreamSubscriber<R> onWriteStreamError(Action1<Throwable> action1) {
        this.writeStreamExceptionHandler = action1;
        return this;
    }

    @Override // io.vertx.rx.java.WriteStreamSubscriber
    public synchronized WriteStreamSubscriber<R> onWriteStreamEnd(Action0 action0) {
        this.writeStreamEndHandler = action0;
        return this;
    }

    @Override // io.vertx.rx.java.WriteStreamSubscriber
    public synchronized WriteStreamSubscriber<R> onWriteStreamEndError(Action1<Throwable> action1) {
        this.writeStreamEndErrorHandler = action1;
        return this;
    }
}
