package io.micronaut.http.client.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.netty.EventLoopFlow;
import io.micronaut.http.netty.body.BufferConsumer;
import io.micronaut.http.netty.body.StreamingNettyByteBody;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import java.util.function.Consumer;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:io/micronaut/http/client/netty/StreamWriter.class */
public final class StreamWriter extends ChannelInboundHandlerAdapter implements BufferConsumer {
    private final Consumer<Throwable> errorHandler;
    private ChannelHandlerContext ctx;
    private EventLoopFlow flow;
    private final BufferConsumer.Upstream upstream;
    private long unwritten = 0;
    private boolean completed = false;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamWriter(StreamingNettyByteBody streamingNettyByteBody, Consumer<Throwable> consumer) {
        this.errorHandler = consumer;
        this.upstream = streamingNettyByteBody.primary(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startWriting() {
        if (this.ctx == null) {
            throw new IllegalStateException("Not added to a channel yet");
        }
        try {
            this.upstream.start();
        } catch (Exception e) {
            this.errorHandler.accept(e);
        }
    }

    void cancel() {
        this.upstream.allowDiscard();
        this.upstream.disregardBackpressure();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isCompleted() {
        return this.completed;
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) {
        this.ctx = channelHandlerContext;
        this.flow = new EventLoopFlow(channelHandlerContext.channel().eventLoop());
    }

    public void add(ByteBuf byteBuf) {
        if (this.flow.executeNow(() -> {
            add0(byteBuf);
        })) {
            add0(byteBuf);
        }
    }

    private void add0(ByteBuf byteBuf) {
        if (this.ctx == null) {
            byteBuf.release();
        } else {
            int readableBytes = byteBuf.readableBytes();
            this.ctx.writeAndFlush(new DefaultHttpContent(byteBuf)).addListener(channelFuture -> {
                if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
                    throw new AssertionError();
                }
                if (!channelFuture.isSuccess()) {
                    error(channelFuture.cause());
                } else if (this.ctx.channel().isWritable()) {
                    this.upstream.onBytesConsumed(readableBytes);
                } else {
                    this.unwritten += readableBytes;
                }
            });
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        long j = this.unwritten;
        if (channelHandlerContext.channel().isWritable() && j != 0) {
            this.unwritten = 0L;
            this.upstream.onBytesConsumed(j);
        }
        super.channelWritabilityChanged(channelHandlerContext);
    }

    public void complete() {
        if (this.flow.executeNow(this::complete0)) {
            complete0();
        }
    }

    private void complete0() {
        if (this.ctx == null) {
            return;
        }
        this.ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, this.ctx.voidPromise());
        this.completed = true;
    }

    public void discard() {
    }

    public void error(Throwable th) {
        if (this.ctx == null) {
            return;
        }
        this.errorHandler.accept(th);
    }

    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        cancel();
    }

    static {
        $assertionsDisabled = !StreamWriter.class.desiredAssertionStatus();
    }
}
