/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.exception.UncheckedException;
import io.activej.common.recycle.Recyclable;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.AbstractChannelConsumer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.RecyclingChannelConsumer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.util.RunnableWithContext;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class ChannelConsumers {
    public static <T> Promise<Void> acceptAll(ChannelConsumer<T> output, Iterator<? extends T> it) {
        if (!it.hasNext()) {
            return Promise.complete();
        }
        return Promise.ofCallback(cb -> ChannelConsumers.acceptAllImpl(output, it, false, (SettablePromise<Void>)cb));
    }

    public static <T> Promise<Void> acceptAll(ChannelConsumer<T> output, List<? extends T> list) {
        if (list.isEmpty()) {
            return Promise.complete();
        }
        return Promise.ofCallback(cb -> ChannelConsumers.acceptAllImpl(output, list.iterator(), true, (SettablePromise<Void>)cb));
    }

    private static <T> void acceptAllImpl(ChannelConsumer<T> output, Iterator<? extends T> it, boolean ownership, SettablePromise<Void> cb) {
        while (it.hasNext()) {
            Promise<Void> accept = output.accept(it.next());
            if (accept.isResult()) continue;
            accept.whenComplete(($, e) -> {
                if (e == null) {
                    ChannelConsumers.acceptAllImpl(output, it, ownership, cb);
                } else {
                    if (ownership) {
                        it.forEachRemaining(Recyclers::recycle);
                    } else {
                        Recyclers.recycle((Object)it);
                    }
                    cb.setException(e);
                }
            });
            return;
        }
        cb.set(null);
    }

    public static <T extends Recyclable> ChannelConsumer<T> recycling() {
        return new RecyclingChannelConsumer();
    }

    public static ChannelConsumer<ByteBuf> outputStreamAsChannelConsumer(final Executor executor, final OutputStream outputStream) {
        return new AbstractChannelConsumer<ByteBuf>(){

            @Override
            protected Promise<Void> doAccept(@Nullable ByteBuf buf) {
                return Promise.ofBlockingRunnable((Executor)executor, () -> {
                    try {
                        if (buf != null) {
                            outputStream.write(buf.array(), buf.head(), buf.readRemaining());
                            buf.recycle();
                        } else {
                            outputStream.close();
                        }
                    }
                    catch (IOException e) {
                        throw new UncheckedException((Throwable)e);
                    }
                });
            }

            protected void onClosed(@NotNull Throwable e) {
                executor.execute(() -> {
                    try {
                        outputStream.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                });
            }
        };
    }

    public static OutputStream channelConsumerAsOutputStream(final Eventloop eventloop, final ChannelConsumer<ByteBuf> channelConsumer) {
        return new OutputStream(){

            @Override
            public void write(int b) throws IOException {
                this.write(new byte[]{(byte)b}, 0, 1);
            }

            @Override
            public void write(@NotNull byte[] b, int off, int len) throws IOException {
                this.submit(ByteBuf.wrap((byte[])b, (int)off, (int)(off + len)));
            }

            @Override
            public void close() throws IOException {
                this.submit(null);
            }

            private void submit(ByteBuf buf) throws IOException {
                CompletableFuture future = eventloop.submit(() -> channelConsumer.accept(buf));
                try {
                    future.get();
                }
                catch (InterruptedException e) {
                    eventloop.execute(RunnableWithContext.wrapContext((Object)channelConsumer, () -> ((ChannelConsumer)channelConsumer).close()));
                    throw new IOException(e);
                }
                catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof IOException) {
                        throw (IOException)cause;
                    }
                    if (cause instanceof RuntimeException) {
                        throw (RuntimeException)cause;
                    }
                    if (cause instanceof Exception) {
                        throw new IOException(cause);
                    }
                    throw (Error)cause;
                }
            }
        };
    }
}

