/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp;

import io.activej.async.function.AsyncConsumer;
import io.activej.async.process.AsyncCloseable;
import io.activej.async.process.AsyncExecutor;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.exception.UncheckedException;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.AbstractChannelConsumer;
import io.activej.csp.ChannelConsumers;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.ChannelConsumerTransformer;
import io.activej.csp.queue.ChannelQueue;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.eventloop.Eventloop;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public interface ChannelConsumer<T>
extends AsyncCloseable {
    @NotNull
    public Promise<Void> accept(@Nullable T var1);

    default public Promise<Void> acceptEndOfStream() {
        return this.accept(null);
    }

    @NotNull
    default public Promise<Void> acceptAll(T ... items) {
        return this.acceptAll(Arrays.asList(items));
    }

    @NotNull
    default public Promise<Void> acceptAll(@NotNull Iterator<? extends T> it) {
        return ChannelConsumers.acceptAll(this, it);
    }

    default public Promise<Void> acceptAll(@NotNull List<T> list) {
        return ChannelConsumers.acceptAll(this, list);
    }

    public static <T> ChannelConsumer<T> of(@NotNull AsyncConsumer<T> consumer) {
        return ChannelConsumer.of(consumer, e -> {});
    }

    public static <T> ChannelConsumer<T> of(final @NotNull AsyncConsumer<T> consumer, @Nullable AsyncCloseable closeable) {
        return new AbstractChannelConsumer<T>(closeable){
            final AsyncConsumer<T> thisConsumer;
            {
                super(closeable);
                this.thisConsumer = consumer;
            }

            @Override
            protected Promise<Void> doAccept(T value) {
                if (value != null) {
                    return this.thisConsumer.accept(value);
                }
                return Promise.complete();
            }
        };
    }

    public static <T> ChannelConsumer<T> ofConsumer(@NotNull Consumer<T> consumer) {
        return ChannelConsumer.of(AsyncConsumer.of(consumer));
    }

    public static <T> ChannelConsumer<T> ofException(final Throwable e) {
        return new AbstractChannelConsumer<T>(){

            @Override
            protected Promise<Void> doAccept(T value) {
                Recyclers.recycle(value);
                return Promise.ofException((Throwable)e);
            }
        };
    }

    public static <T> ChannelConsumer<T> ofSupplier(Function<ChannelSupplier<T>, Promise<Void>> supplier) {
        return ChannelConsumer.ofSupplier(supplier, new ChannelZeroBuffer());
    }

    public static <T> ChannelConsumer<T> ofSupplier(Function<ChannelSupplier<T>, Promise<Void>> supplier, ChannelQueue<T> queue) {
        Promise<Void> extraAcknowledge = supplier.apply(queue.getSupplier());
        ChannelConsumer<T> result = queue.getConsumer();
        if (extraAcknowledge == Promise.complete()) {
            return result;
        }
        return result.withAcknowledgement(ack -> ack.both(extraAcknowledge));
    }

    public static <T> ChannelConsumer<T> ofPromise(final Promise<? extends ChannelConsumer<T>> promise) {
        if (promise.isResult()) {
            return (ChannelConsumer)promise.getResult();
        }
        return new AbstractChannelConsumer<T>(){
            ChannelConsumer<T> consumer;
            Throwable exception;

            @Override
            protected Promise<Void> doAccept(T value) {
                if (this.consumer != null) {
                    return this.consumer.accept(value);
                }
                return promise.thenEx((consumer, e) -> {
                    if (e == null) {
                        this.consumer = consumer;
                        return consumer.accept(value);
                    }
                    Recyclers.recycle((Object)value);
                    return Promise.ofException((Throwable)e);
                });
            }

            protected void onClosed(@NotNull Throwable e) {
                this.exception = e;
                promise.whenResult(supplier -> supplier.closeEx(e));
            }
        };
    }

    public static <T> ChannelConsumer<T> ofAnotherEventloop(final @NotNull Eventloop anotherEventloop, final @NotNull ChannelConsumer<T> anotherEventloopConsumer) {
        if (Eventloop.getCurrentEventloop() == anotherEventloop) {
            return anotherEventloopConsumer;
        }
        return new AbstractChannelConsumer<T>(){

            @Override
            protected Promise<Void> doAccept(@Nullable T value) {
                SettablePromise promise = new SettablePromise();
                this.eventloop.startExternalTask();
                anotherEventloop.execute(() -> anotherEventloopConsumer.accept(value).whenComplete((v, e) -> {
                    this.eventloop.execute(() -> promise.accept(v, e));
                    this.eventloop.completeExternalTask();
                }));
                return promise;
            }

            protected void onClosed(@NotNull Throwable e) {
                this.eventloop.startExternalTask();
                anotherEventloop.execute(() -> {
                    anotherEventloopConsumer.closeEx(e);
                    this.eventloop.completeExternalTask();
                });
            }
        };
    }

    public static <T> ChannelConsumer<T> ofLazyProvider(final Supplier<? extends ChannelConsumer<T>> provider) {
        return new AbstractChannelConsumer<T>(){
            private ChannelConsumer<T> consumer;

            @Override
            protected Promise<Void> doAccept(@Nullable T value) {
                if (this.consumer == null) {
                    this.consumer = (ChannelConsumer)provider.get();
                }
                return this.consumer.accept(value);
            }

            protected void onClosed(@NotNull Throwable e) {
                if (this.consumer != null) {
                    this.consumer.closeEx(e);
                }
            }
        };
    }

    public static ChannelConsumer<ByteBuf> ofSocket(AsyncTcpSocket socket) {
        return ChannelConsumer.of(arg_0 -> ((AsyncTcpSocket)socket).write(arg_0), (AsyncCloseable)socket).withAcknowledgement(ack -> ack.then(() -> socket.write(null)));
    }

    default public <R> R transformWith(ChannelConsumerTransformer<T, R> fn) {
        return fn.transform(this);
    }

    default public ChannelConsumer<T> async() {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                return ChannelConsumer.this.accept(value).async();
            }
        };
    }

    default public ChannelConsumer<T> withExecutor(final AsyncExecutor asyncExecutor) {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                return asyncExecutor.execute(() -> ChannelConsumer.this.accept(value));
            }
        };
    }

    default public ChannelConsumer<T> peek(final Consumer<? super T> fn) {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                if (value != null) {
                    fn.accept(value);
                }
                return ChannelConsumer.this.accept(value);
            }
        };
    }

    default public <V> ChannelConsumer<V> map(final Function<? super V, ? extends T> fn) {
        return new AbstractChannelConsumer<V>(this){

            @Override
            protected Promise<Void> doAccept(V value) {
                if (value != null) {
                    Object newValue;
                    try {
                        newValue = fn.apply(value);
                    }
                    catch (UncheckedException u) {
                        ChannelConsumer.this.closeEx(u.getCause());
                        return Promise.ofException((Throwable)u.getCause());
                    }
                    return ChannelConsumer.this.accept(newValue);
                }
                return ChannelConsumer.this.acceptEndOfStream();
            }
        };
    }

    default public <V> ChannelConsumer<V> mapAsync(final Function<? super V, ? extends Promise<T>> fn) {
        return new AbstractChannelConsumer<V>(this){

            @Override
            protected Promise<Void> doAccept(V value) {
                return value != null ? ((Promise)fn.apply(value)).then(ChannelConsumer.this::accept) : ChannelConsumer.this.acceptEndOfStream();
            }
        };
    }

    default public ChannelConsumer<T> filter(final Predicate<? super T> predicate) {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                if (value != null && predicate.test(value)) {
                    return ChannelConsumer.this.accept(value);
                }
                Recyclers.recycle(value);
                return Promise.complete();
            }
        };
    }

    default public ChannelConsumer<T> withAcknowledgement(Function<Promise<Void>, Promise<Void>> fn) {
        final SettablePromise acknowledgement = new SettablePromise();
        final Promise<Void> newAcknowledgement = fn.apply((Promise<Void>)acknowledgement);
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(@Nullable T value) {
                if (value != null) {
                    return ChannelConsumer.this.accept(value).thenEx(($, e) -> {
                        if (e == null) {
                            return Promise.complete();
                        }
                        acknowledgement.trySetException(e);
                        return newAcknowledgement;
                    });
                }
                ChannelConsumer.this.accept(null).whenComplete((arg_0, arg_1) -> ((SettablePromise)acknowledgement).trySet(arg_0, arg_1));
                return newAcknowledgement;
            }

            protected void onClosed(@NotNull Throwable e) {
                acknowledgement.trySetException(e);
            }
        };
    }
}

