/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.binary;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.exception.UnexpectedDataException;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.binary.BinaryChannelInput;
import io.activej.csp.binary.ByteBufsDecoder;
import io.activej.promise.Promise;
import java.util.Iterator;
import java.util.List;
import org.jetbrains.annotations.NotNull;

public abstract class BinaryChannelSupplier
extends AbstractAsyncCloseable {
    protected final ByteBufs bufs;

    protected BinaryChannelSupplier(ByteBufs bufs) {
        this.bufs = bufs;
    }

    protected BinaryChannelSupplier() {
        this.bufs = new ByteBufs();
    }

    public ByteBufs getBufs() {
        return this.bufs;
    }

    public abstract Promise<Void> needMoreData();

    public abstract Promise<Void> endOfStream();

    public static BinaryChannelSupplier ofList(List<ByteBuf> iterable) {
        return BinaryChannelSupplier.of(ChannelSupplier.ofList(iterable));
    }

    public static BinaryChannelSupplier ofIterator(Iterator<ByteBuf> iterator) {
        return BinaryChannelSupplier.of(ChannelSupplier.ofIterator(iterator));
    }

    public static BinaryChannelSupplier of(final ChannelSupplier<ByteBuf> input) {
        return new BinaryChannelSupplier(){

            @Override
            public Promise<Void> needMoreData() {
                return input.get().then(buf -> {
                    if (buf != null) {
                        this.bufs.add(buf);
                        return Promise.complete();
                    }
                    return Promise.ofException((Throwable)new TruncatedDataException("Unexpected end-of-stream"));
                });
            }

            @Override
            public Promise<Void> endOfStream() {
                if (!this.bufs.isEmpty()) {
                    this.bufs.recycle();
                    UnexpectedDataException exception = new UnexpectedDataException("Unexpected data after end-of-stream");
                    input.closeEx((Throwable)exception);
                    return Promise.ofException((Throwable)exception);
                }
                return input.get().then(buf -> {
                    if (buf == null) {
                        return Promise.complete();
                    }
                    buf.recycle();
                    UnexpectedDataException exception = new UnexpectedDataException("Unexpected data after end-of-stream");
                    input.closeEx((Throwable)exception);
                    return Promise.ofException((Throwable)exception);
                });
            }

            protected void onClosed(@NotNull Throwable e) {
                input.closeEx(e);
            }
        };
    }

    public static BinaryChannelSupplier ofProvidedBufs(ByteBufs bufs, final AsyncSupplier<Void> get, final AsyncSupplier<Void> complete, final AsyncCloseable closeable) {
        return new BinaryChannelSupplier(bufs){

            @Override
            public Promise<Void> needMoreData() {
                return get.get();
            }

            @Override
            public Promise<Void> endOfStream() {
                return complete.get();
            }

            protected void onClosed(@NotNull Throwable e) {
                closeable.closeEx(e);
            }
        };
    }

    public final <T> Promise<T> decode(ByteBufsDecoder<T> decoder) {
        return this.doDecode(decoder, (AsyncCloseable)this);
    }

    @NotNull
    private <T> Promise<T> doDecode(ByteBufsDecoder<T> decoder, AsyncCloseable closeable) {
        Promise<Void> moreDataPromise;
        do {
            T result;
            if (this.bufs.isEmpty()) continue;
            try {
                result = decoder.tryDecode(this.bufs);
            }
            catch (Exception e) {
                this.closeEx(e);
                return Promise.ofException((Throwable)e);
            }
            if (result == null) continue;
            return Promise.of(result);
        } while ((moreDataPromise = this.needMoreData()).isResult());
        return moreDataPromise.whenException(arg_0 -> ((AsyncCloseable)closeable).closeEx(arg_0)).then(() -> this.doDecode(decoder, closeable));
    }

    public final <T> Promise<T> decodeRemaining(ByteBufsDecoder<T> decoder) {
        return this.decode(decoder).then(result -> {
            if (!this.bufs.isEmpty()) {
                UnexpectedDataException exception = new UnexpectedDataException("Unexpected data after end-of-stream");
                this.closeEx((Throwable)exception);
                return Promise.ofException((Throwable)exception);
            }
            return this.endOfStream().map($ -> result);
        });
    }

    public final <T> ChannelSupplier<T> decodeStream(ByteBufsDecoder<T> decoder) {
        return ChannelSupplier.of(() -> this.doDecode(decoder, e -> {
            if (e instanceof TruncatedDataException && this.bufs.isEmpty()) {
                return;
            }
            this.closeEx(e);
        }).thenEx((value, e) -> {
            if (e == null) {
                return Promise.of((Object)value);
            }
            if (e instanceof TruncatedDataException && this.bufs.isEmpty()) {
                return Promise.of(null);
            }
            return Promise.ofException((Throwable)e);
        }), (AsyncCloseable)this);
    }

    public Promise<Void> bindTo(BinaryChannelInput input) {
        return input.set(this);
    }

    protected void onCleanup() {
        this.bufs.recycle();
    }
}

