/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.reactive;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class BaseSubscriber<T>
implements Subscriber<T> {
    private static final Logger log = LogManager.getLogger(BaseSubscriber.class);
    protected final Context context;
    private Subscription subscription;
    private boolean complete;
    private boolean cancelled;

    @SuppressFBWarnings(value={"EI_EXPOSE_REP2"}, justification="context should be mutable")
    public BaseSubscriber(Context context) {
        this.context = Objects.requireNonNull(context);
    }

    public final void onSubscribe(Subscription s) {
        Objects.requireNonNull(s);
        this.runOnRightContext(() -> this.doOnSubscribe(s));
    }

    public final void onNext(T t) {
        Objects.requireNonNull(t);
        this.runOnRightContext(() -> this.doOnNext(t));
    }

    public final void onError(Throwable t) {
        Objects.requireNonNull(t);
        this.runOnRightContext(() -> this.doOnError(t));
    }

    public void cancel() {
        this.checkContext();
        this.cancelled = true;
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    public final void onComplete() {
        this.runOnRightContext(this::doOnComplete);
    }

    protected void afterSubscribe(Subscription subscription) {
    }

    protected void handleValue(T t) {
    }

    protected void handleComplete() {
    }

    protected void handleError(Throwable t) {
    }

    protected final void makeRequest(long l) {
        this.checkContext();
        try {
            this.subscription.request(l);
        }
        catch (Throwable t) {
            IllegalStateException e = new IllegalStateException("Exceptions must not be thrown from request", t);
            this.logError(e);
        }
    }

    protected final void complete() {
        this.checkContext();
        this.complete = true;
        if (this.subscription != null) {
            try {
                this.subscription.cancel();
            }
            catch (Throwable t) {
                IllegalStateException e = new IllegalStateException("Exceptions must not be thrown from cancel", t);
                this.logError(e);
            }
        }
    }

    protected final void checkContext() {
        VertxUtils.checkContext(this.context);
    }

    private void runOnRightContext(Runnable runnable) {
        if (VertxUtils.isEventLoopAndSameContext(this.context)) {
            runnable.run();
        } else {
            this.context.runOnContext(v -> runnable.run());
        }
    }

    private void logError(Throwable t) {
        log.error(t.getMessage(), t);
    }

    private void doOnSubscribe(Subscription subscription) {
        this.checkContext();
        if (this.subscription != null) {
            try {
                subscription.cancel();
            }
            catch (Throwable t) {
                IllegalStateException e = new IllegalStateException("Exceptions must not be thrown from cancel", t);
                this.logError(e);
            }
            return;
        }
        this.subscription = subscription;
        this.afterSubscribe(subscription);
    }

    private void doOnNext(T val) {
        this.checkContext();
        if (this.complete || this.cancelled) {
            return;
        }
        if (this.subscription == null) {
            IllegalStateException e = new IllegalStateException("onNext must be called without request being called");
            this.logError(e);
        }
        try {
            this.handleValue(val);
        }
        catch (Throwable t) {
            this.complete();
            this.onError(t);
        }
    }

    private void doOnError(Throwable t) {
        this.checkContext();
        if (this.cancelled) {
            return;
        }
        if (this.subscription == null) {
            this.logError(new IllegalStateException("onError must not be called before onSubscribe", t));
        } else {
            this.complete = true;
            this.handleError(t);
        }
    }

    private void doOnComplete() {
        this.checkContext();
        if (this.cancelled || this.complete) {
            return;
        }
        if (this.subscription == null) {
            this.logError(new IllegalStateException("onComplete must not be called before onSubscribe"));
        } else {
            this.complete = true;
            this.handleComplete();
        }
    }
}

