/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import io.netty.channel.EventLoop;
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInitializer;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.OverlappingExecutionException;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.Upstream;
import ratpack.exec.internal.AsyncDownstream;
import ratpack.exec.internal.Continuation;
import ratpack.exec.internal.ContinuationStream;
import ratpack.exec.internal.ExecControllerInternal;
import ratpack.exec.internal.ExecThreadBinding;
import ratpack.exec.internal.ExecutionBoundPublisher;
import ratpack.func.Action;
import ratpack.func.Block;
import ratpack.registry.MutableRegistry;
import ratpack.registry.NotInRegistryException;
import ratpack.registry.RegistrySpec;
import ratpack.registry.internal.SimpleMutableRegistry;
import ratpack.stream.TransformablePublisher;

public class DefaultExecution
implements Execution {
    public static final Logger LOGGER = LoggerFactory.getLogger(Execution.class);
    private ExecStream execStream;
    private final ExecControllerInternal controller;
    private final EventLoop eventLoop;
    private final Action<? super Throwable> onError;
    private final Action<? super Execution> onComplete;
    private List<AutoCloseable> closeables;
    private final MutableRegistry registry = new SimpleMutableRegistry();
    private List<ExecInterceptor> adhocInterceptors;
    private Iterable<? extends ExecInterceptor> interceptors;
    private Thread thread;

    public DefaultExecution(ExecControllerInternal controller, EventLoop eventLoop, Action<? super RegistrySpec> registryInit, Action<? super Execution> action, Action<? super Throwable> onError, Action<? super Execution> onStart, Action<? super Execution> onComplete) throws Exception {
        this.controller = controller;
        this.eventLoop = eventLoop;
        this.onError = onError;
        this.onComplete = onComplete;
        registryInit.execute(this.registry);
        onStart.execute(this);
        this.execStream = new InitialExecStream(() -> action.execute(this));
        this.interceptors = Iterables.concat(controller.getInterceptors(), (Iterable)ImmutableList.copyOf(this.registry.getAll(ExecInterceptor.class)));
        for (ExecInitializer initializer : controller.getInitializers()) {
            initializer.init(this);
        }
        for (ExecInitializer initializer : this.registry.getAll(ExecInitializer.class)) {
            initializer.init(this);
        }
        this.drain();
    }

    public static DefaultExecution get() {
        ExecThreadBinding execThreadBinding = ExecThreadBinding.get();
        if (execThreadBinding == null) {
            return null;
        }
        return execThreadBinding.getExecution();
    }

    public static DefaultExecution require() throws UnmanagedThreadException {
        DefaultExecution execution = ExecThreadBinding.require().getExecution();
        if (execution == null) {
            throw new IllegalStateException("No execution bound for thread " + Thread.currentThread().getName());
        }
        return execution;
    }

    public static <T> TransformablePublisher<T> stream(Publisher<T> publisher, Action<? super T> disposer) {
        return publisher instanceof ExecutionBoundPublisher ? (TransformablePublisher<Object>)publisher : new ExecutionBoundPublisher<T>(publisher, disposer);
    }

    public static <T> Upstream<T> upstream(Upstream<T> upstream) {
        return downstream -> DefaultExecution.require().delimit(downstream::error, continuation -> {
            AsyncDownstream asyncDownstream = new AsyncDownstream((Continuation)continuation, downstream);
            try {
                upstream.connect(asyncDownstream);
            }
            catch (Throwable throwable) {
                if (asyncDownstream.fire()) {
                    continuation.resume(() -> downstream.error(throwable));
                }
                LOGGER.error("", (Throwable)new OverlappingExecutionException("promise already fulfilled", throwable));
            }
        });
    }

    @Override
    public EventLoop getEventLoop() {
        return this.eventLoop;
    }

    public void delimit(Action<? super Throwable> onError, Action<? super Continuation> segment) {
        this.execStream.delimit(onError, segment);
        this.drain();
    }

    public void delimitStream(Action<? super Throwable> onError, Action<? super ContinuationStream> segment) {
        this.execStream.delimitStream(onError, segment);
        this.drain();
    }

    public void eventLoopDrain() {
        this.eventLoop.execute(this::drain);
    }

    public boolean isBound() {
        return this.thread == Thread.currentThread();
    }

    private void drain() {
        if (this.isBound()) {
            return;
        }
        if (this.execStream == TerminalExecStream.INSTANCE) {
            return;
        }
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoopDrain();
            return;
        }
        if (DefaultExecution.get() != null) {
            this.eventLoopDrain();
            return;
        }
        try {
            this.bindToThread();
            this.exec(this.interceptors.iterator());
        }
        catch (Throwable e) {
            DefaultExecution.interceptorError(e);
        }
        finally {
            this.unbindFromThread();
        }
    }

    public void unbindFromThread() {
        this.thread = null;
        ExecThreadBinding.require().setExecution(null);
    }

    public void bindToThread() {
        this.thread = Thread.currentThread();
        ExecThreadBinding.require().setExecution(this);
    }

    public static void interceptorError(Throwable e) {
        LOGGER.warn("exception was thrown by an execution interceptor (which will be ignored):", e);
    }

    public Iterable<? extends ExecInterceptor> getAllInterceptors() {
        return this.interceptors;
    }

    private void exec(Iterator<? extends ExecInterceptor> interceptors) throws Exception {
        if (interceptors.hasNext()) {
            interceptors.next().intercept(this, ExecInterceptor.ExecType.COMPUTE, () -> this.exec(interceptors));
        } else {
            this.exec();
        }
    }

    private void exec() {
        while (true) {
            try {
                while (this.execStream.exec()) {
                }
            }
            catch (Throwable segmentError) {
                this.execStream.error(segmentError);
                continue;
            }
            break;
        }
        if (this.execStream == TerminalExecStream.INSTANCE) {
            try {
                this.onComplete.execute(this);
            }
            catch (Throwable e) {
                LOGGER.warn("exception raised during onComplete action", e);
            }
            if (this.closeables != null) {
                for (AutoCloseable closeable : this.closeables) {
                    try {
                        closeable.close();
                    }
                    catch (Throwable e) {
                        LOGGER.warn("exception raised by execution closeable " + closeable, e);
                    }
                }
            }
        }
    }

    @Override
    public ExecController getController() {
        return this.controller;
    }

    @Override
    public void onComplete(AutoCloseable closeable) {
        if (this.closeables == null) {
            this.closeables = Lists.newArrayList();
        }
        this.closeables.add(closeable);
    }

    @Override
    public boolean isComplete() {
        return this.execStream == TerminalExecStream.INSTANCE;
    }

    @Override
    public <O> Execution addLazy(TypeToken<O> type, Supplier<? extends O> supplier) {
        this.registry.addLazy(type, supplier);
        return this;
    }

    @Override
    public <O> Execution add(TypeToken<O> type, O object) {
        this.registry.add(type, object);
        return this;
    }

    @Override
    public void addInterceptor(ExecInterceptor execInterceptor, Block continuation) throws Exception {
        if (this.adhocInterceptors == null) {
            this.adhocInterceptors = Lists.newArrayList();
            this.interceptors = Iterables.concat(this.interceptors, this.adhocInterceptors);
        }
        this.adhocInterceptors.add(execInterceptor);
        execInterceptor.intercept(this, ExecInterceptor.ExecType.COMPUTE, continuation);
    }

    @Override
    public <T> void remove(TypeToken<T> type) throws NotInRegistryException {
        this.registry.remove(type);
    }

    @Override
    public <O> Optional<O> maybeGet(TypeToken<O> type) {
        return this.registry.maybeGet(type);
    }

    @Override
    public <O> Iterable<? extends O> getAll(TypeToken<O> type) {
        return this.registry.getAll(type);
    }

    private class MultiEventExecStream
    extends BaseExecStream
    implements ContinuationStream {
        final ExecStream parent;
        private final Action<? super Throwable> onError;
        final Queue<Queue<Block>> events;
        private final AtomicReference<Block> complete;

        MultiEventExecStream(ExecStream parent, Action<? super Throwable> onError, Action<? super ContinuationStream> initial) {
            this.events = PlatformDependent.newMpscQueue();
            this.complete = new AtomicReference();
            this.parent = parent;
            this.onError = onError;
            this.event(() -> initial.execute(this));
        }

        @Override
        public boolean event(Block action) {
            if (this.complete.get() == null) {
                ArrayDeque<Block> event = new ArrayDeque<Block>();
                event.add(action);
                this.events.add(event);
                DefaultExecution.this.drain();
                return true;
            }
            return false;
        }

        @Override
        public boolean complete(Block action) {
            if (this.complete.compareAndSet(null, action)) {
                DefaultExecution.this.drain();
                return true;
            }
            return false;
        }

        @Override
        boolean exec() throws Exception {
            Block nextSegment = this.events.peek().poll();
            if (nextSegment == null) {
                if (this.events.size() == 1) {
                    if (this.complete.get() == null) {
                        return false;
                    }
                    DefaultExecution.this.execStream = this.parent;
                    this.complete.get().execute();
                    return true;
                }
                this.events.poll();
                return true;
            }
            nextSegment.execute();
            return true;
        }

        @Override
        void enqueue(Block segment) {
            this.events.peek().add(segment);
        }

        @Override
        void error(Throwable throwable) {
            DefaultExecution.this.execStream = this.parent;
            try {
                this.onError.execute(throwable);
            }
            catch (Exception e) {
                DefaultExecution.this.execStream.error(e);
            }
        }
    }

    private class SingleEventExecStream
    extends BaseExecStream
    implements Continuation {
        final ExecStream parent;
        Action<? super Continuation> initial;
        Action<? super Throwable> onError;
        Action<? super Continuation> next;
        Action<? super Throwable> nextOnError;
        Block resumer;
        boolean resumed;
        Queue<Block> segments;

        SingleEventExecStream(ExecStream parent, Action<? super Throwable> onError, Action<? super Continuation> initial) {
            this.parent = parent;
            this.onError = onError;
            this.initial = initial;
        }

        @Override
        boolean exec() throws Exception {
            if (this.initial == null) {
                if (this.next != null) {
                    DefaultExecution.this.execStream = new SingleEventExecStream(this.asParent(), this.nextOnError, this.next);
                    this.next = null;
                    return true;
                }
                if (this.segments == null) {
                    if (this.resumer == null) {
                        if (this.resumed) {
                            DefaultExecution.this.execStream = this.parent;
                            return true;
                        }
                        return false;
                    }
                    this.resumer.execute();
                    this.resumer = null;
                    if (this.next != null) {
                        this.initial = this.next;
                        this.onError = this.nextOnError;
                        this.next = null;
                        this.nextOnError = null;
                        this.resumed = false;
                    }
                    return true;
                }
                Block segment = this.segments.poll();
                if (this.segments.isEmpty()) {
                    this.segments = null;
                }
                segment.execute();
                return true;
            }
            this.initial.execute(this);
            this.initial = null;
            return true;
        }

        @Override
        ExecStream asParent() {
            return this.resumed && this.resumer == null && this.segments == null ? this.parent.asParent() : this;
        }

        @Override
        void delimit(Action<? super Throwable> onError, Action<? super Continuation> segment) {
            if (this.next == null && this.segments == null) {
                this.next = segment;
                this.nextOnError = onError;
            } else {
                if (this.next != null) {
                    super.delimit(this.nextOnError, this.next);
                    this.next = null;
                    this.nextOnError = null;
                }
                super.delimit(onError, segment);
            }
        }

        @Override
        void enqueue(Block segment) {
            if (this.segments == null) {
                this.segments = new ArrayDeque<Block>();
            }
            this.segments.add(segment);
        }

        @Override
        public void resume(Block action) {
            if (DefaultExecution.this.isBound()) {
                this.doResume(action);
            } else {
                DefaultExecution.this.eventLoop.execute(() -> this.doResume(action));
            }
        }

        private void doResume(Block action) {
            this.resumed = true;
            this.resumer = action;
            DefaultExecution.this.drain();
        }

        @Override
        void error(Throwable throwable) {
            DefaultExecution.this.execStream = this.parent;
            if (this.resumed && this.resumer == null) {
                this.parent.error(throwable);
            } else {
                try {
                    this.onError.execute(throwable);
                }
                catch (Throwable e) {
                    DefaultExecution.this.execStream.error(e);
                }
            }
        }
    }

    private class InitialExecStream
    extends BaseExecStream {
        Block initial;
        Queue<Block> segments;

        InitialExecStream(Block initial) {
            this.initial = initial;
        }

        @Override
        boolean exec() throws Exception {
            if (this.initial == null) {
                if (this.segments == null) {
                    DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
                    return false;
                }
                Block segment = this.segments.poll();
                if (segment == null) {
                    DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
                    return false;
                }
                segment.execute();
                return true;
            }
            Block initial = this.initial;
            this.initial = null;
            initial.execute();
            return true;
        }

        @Override
        void enqueue(Block segment) {
            if (this.initial == null) {
                this.initial = segment;
            } else {
                if (this.segments == null) {
                    this.segments = new ArrayDeque<Block>(1);
                }
                this.segments.add(segment);
            }
        }

        @Override
        void error(Throwable throwable) {
            this.initial = null;
            if (this.segments != null) {
                this.segments.clear();
            }
            try {
                DefaultExecution.this.onError.execute(throwable);
            }
            catch (Throwable errorHandlerError) {
                LOGGER.error("error handler " + DefaultExecution.this.onError + " threw error (this execution will terminate):", errorHandlerError);
                DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
            }
        }
    }

    private static class TerminalExecStream
    extends ExecStream {
        private static final ExecStream INSTANCE = new TerminalExecStream();

        private TerminalExecStream() {
        }

        @Override
        boolean exec() {
            return false;
        }

        @Override
        void delimit(Action<? super Throwable> onError, Action<? super Continuation> segment) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }

        @Override
        void delimitStream(Action<? super Throwable> onError, Action<? super ContinuationStream> segment) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }

        @Override
        void enqueue(Block segment) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }

        @Override
        void error(Throwable throwable) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }

        @Override
        ExecStream asParent() {
            return this;
        }
    }

    private abstract class BaseExecStream
    extends ExecStream {
        private BaseExecStream() {
        }

        @Override
        void delimit(Action<? super Throwable> onError, Action<? super Continuation> segment) {
            this.enqueue(() -> DefaultExecution.this.execStream = new SingleEventExecStream(DefaultExecution.this.execStream.asParent(), onError, segment));
        }

        @Override
        void delimitStream(Action<? super Throwable> onError, Action<? super ContinuationStream> segment) {
            this.enqueue(() -> DefaultExecution.this.execStream = new MultiEventExecStream(DefaultExecution.this.execStream.asParent(), onError, segment));
        }

        @Override
        ExecStream asParent() {
            return this;
        }
    }

    private static abstract class ExecStream {
        private ExecStream() {
        }

        abstract boolean exec() throws Exception;

        abstract void delimit(Action<? super Throwable> var1, Action<? super Continuation> var2);

        abstract void delimitStream(Action<? super Throwable> var1, Action<? super ContinuationStream> var2);

        abstract void enqueue(Block var1);

        abstract void error(Throwable var1);

        abstract ExecStream asParent();
    }
}

