/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.EmittingPublisher;
import io.helidon.common.reactive.Multi;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;

@Deprecated(since="2.0.0", forRemoval=true)
public class MultiFromOutputStream
extends OutputStream
implements Multi<ByteBuffer> {
    private static final int BUFFER_SIZE = 4096;
    private static final byte[] FLUSH_BUFFER = new byte[0];
    private long timeout = Duration.ofMinutes(10L).toMillis();
    private final EmittingPublisher<ByteBuffer> emittingPublisher = EmittingPublisher.create();
    private volatile CompletableFuture<Void> demandUpdated = new CompletableFuture();
    private final ByteBuffer byteBuffer = ByteBuffer.allocate(4096);

    protected MultiFromOutputStream() {
        this.emittingPublisher.onCancel(() -> this.demandUpdated.cancel(true));
        this.emittingPublisher.onRequest((n, demand) -> this.demandUpdated.complete(null));
    }

    void timeout(long timeout) {
        this.timeout = timeout;
    }

    public MultiFromOutputStream onRequest(BiConsumer<Long, Long> requestCallback) {
        this.emittingPublisher.onRequest(requestCallback);
        return this;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        this.emittingPublisher.subscribe(subscriber);
    }

    @Override
    public void write(byte[] b) throws IOException {
        this.publishBufferedMaybe();
        this.publish(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.publishBufferedMaybe();
        this.publish(b, off, len);
    }

    @Override
    public void write(int b) throws IOException {
        if (!this.byteBuffer.hasRemaining()) {
            this.publish();
        }
        this.byteBuffer.put((byte)b);
    }

    @Override
    public void close() throws IOException {
        this.publishBufferedMaybe();
        this.complete();
    }

    @Override
    public void flush() throws IOException {
        this.publishBufferedMaybe();
        this.publish(FLUSH_BUFFER, 0, 0);
    }

    private void publishBufferedMaybe() throws IOException {
        if (this.byteBuffer.position() > 0) {
            this.publish();
        }
    }

    private void publish(byte[] b, int off, int len) throws IOException {
        ByteBuffer emitBuffer = ByteBuffer.allocate(len - off);
        emitBuffer.put(b, off, len);
        emitBuffer.flip();
        this.doPublish(emitBuffer);
    }

    private void publish() throws IOException {
        this.byteBuffer.flip();
        ByteBuffer emitBuffer = ByteBuffer.allocate(this.byteBuffer.remaining());
        emitBuffer.put(this.byteBuffer);
        emitBuffer.flip();
        this.doPublish(emitBuffer);
        this.byteBuffer.clear();
    }

    private void doPublish(ByteBuffer emitBuffer) throws IOException {
        try {
            long start = System.currentTimeMillis();
            while (!this.emittingPublisher.emit(emitBuffer)) {
                if (this.emittingPublisher.isCancelled()) {
                    throw new IOException("Output stream already closed.");
                }
                if (this.emittingPublisher.isFailed()) {
                    Throwable throwable = this.emittingPublisher.failCause().get();
                    throw new IOException(throwable);
                }
                this.await(start, this.timeout, this.demandUpdated);
                this.demandUpdated = new CompletableFuture();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.fail(e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            this.fail(e.getCause());
            throw new IOException(e.getCause());
        }
        catch (IllegalStateException e) {
            this.fail(e);
            throw new IOException(e);
        }
    }

    void complete() {
        this.emittingPublisher.complete();
        this.demandUpdated.complete(null);
    }

    void fail(Throwable t) {
        this.emittingPublisher.fail(t);
        this.demandUpdated.completeExceptionally(t);
    }

    private void await(long startTime, long waitTime, CompletableFuture<?> future) throws ExecutionException, InterruptedException, IOException {
        block2: {
            try {
                future.get(waitTime, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                long diff = System.currentTimeMillis() - startTime;
                if (diff <= this.timeout) break block2;
                throw new IOException("Timed out while waiting for subscriber to read data");
            }
        }
    }
}

