/*
 * Decompiled with CFR 0.152.
 */
package ratpack.http.client.internal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.Downstream;
import ratpack.exec.Execution;
import ratpack.exec.Upstream;
import ratpack.func.Action;
import ratpack.http.Headers;
import ratpack.http.MutableHeaders;
import ratpack.http.Response;
import ratpack.http.Status;
import ratpack.http.client.RequestSpec;
import ratpack.http.client.StreamedResponse;
import ratpack.http.client.internal.HttpClientInternal;
import ratpack.http.client.internal.RequestActionSupport;
import ratpack.http.internal.DefaultStatus;
import ratpack.http.internal.NettyHeadersBackedHeaders;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;
import ratpack.util.Exceptions;

public class ContentStreamingRequestAction
extends RequestActionSupport<StreamedResponse> {
    private static final String HANDLER_NAME = "streaming";

    ContentStreamingRequestAction(URI uri, HttpClientInternal client, int redirectCount, Execution execution, Action<? super RequestSpec> requestConfigurer) throws Exception {
        super(uri, client, redirectCount, execution, requestConfigurer);
    }

    @Override
    protected void doDispose(ChannelPipeline channelPipeline, boolean forceClose) {
        channelPipeline.remove(HANDLER_NAME);
        super.doDispose(channelPipeline, forceClose);
    }

    @Override
    protected void addResponseHandlers(ChannelPipeline p, Downstream<? super StreamedResponse> downstream) {
        p.addLast(HANDLER_NAME, (ChannelHandler)new Handler(p, downstream));
    }

    @Override
    protected Upstream<StreamedResponse> onRedirect(URI locationUrl, int redirectCount, Action<? super RequestSpec> redirectRequestConfig) throws Exception {
        return new ContentStreamingRequestAction(locationUrl, this.client, redirectCount, this.execution, redirectRequestConfig);
    }

    private class Handler
    extends SimpleChannelInboundHandler<HttpObject> {
        private final ChannelPipeline channelPipeline;
        private final Downstream<? super StreamedResponse> downstream;
        private List<HttpContent> received;
        private BufferedWriteStream<ByteBuf> write;
        private HttpResponse response;

        public Handler(ChannelPipeline channelPipeline, Downstream<? super StreamedResponse> downstream) {
            super(false);
            this.channelPipeline = channelPipeline;
            this.downstream = downstream;
        }

        protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) throws Exception {
            if (httpObject instanceof HttpResponse) {
                this.response = (HttpResponse)httpObject;
                int code = this.response.status().code();
                if (code >= 100 && code < 200 || code == 204) {
                    this.response.headers().remove((CharSequence)HttpHeaderNames.CONTENT_LENGTH);
                }
                this.channelPipeline.channel().config().setAutoRead(false);
                ContentStreamingRequestAction.this.execution.onComplete(() -> {
                    if (this.write == null) {
                        ContentStreamingRequestAction.this.forceDispose(this.channelPipeline);
                    }
                    if (this.received != null) {
                        this.received.forEach(ReferenceCounted::release);
                    }
                });
                ContentStreamingRequestAction.this.success(this.downstream, new DefaultStreamedResponse(this.channelPipeline));
            } else if (httpObject instanceof HttpContent) {
                HttpContent httpContent = ((HttpContent)httpObject).touch();
                boolean hasContent = httpContent.content().readableBytes() > 0;
                boolean isLast = httpObject instanceof LastHttpContent;
                if (this.write == null) {
                    if (hasContent || isLast) {
                        if (this.received == null) {
                            this.received = new ArrayList<HttpContent>();
                        }
                        this.received.add(httpContent.touch());
                    } else {
                        httpContent.release();
                    }
                    if (isLast) {
                        ContentStreamingRequestAction.this.dispose(ctx.pipeline(), this.response);
                    }
                } else {
                    if (hasContent) {
                        this.write.item((Object)httpContent.content().touch((Object)"emitting to user code"));
                    } else {
                        httpContent.release();
                    }
                    if (isLast) {
                        ContentStreamingRequestAction.this.dispose(ctx.pipeline(), this.response);
                        this.write.complete();
                    } else if (this.write.getRequested() > 0L) {
                        ctx.read();
                    }
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause = ContentStreamingRequestAction.this.decorateException(cause);
            if (this.write == null) {
                ContentStreamingRequestAction.this.error(this.downstream, cause);
            } else {
                this.write.error(cause);
            }
            ContentStreamingRequestAction.this.forceDispose(ctx.pipeline());
        }

        class DefaultStreamedResponse
        implements StreamedResponse {
            private final ChannelPipeline channelPipeline;
            private final Status status;
            private final Headers headers;

            private DefaultStreamedResponse(ChannelPipeline channelPipeline) {
                this.channelPipeline = channelPipeline;
                this.headers = new NettyHeadersBackedHeaders(Handler.this.response.headers());
                this.status = new DefaultStatus(Handler.this.response.status());
            }

            @Override
            public Status getStatus() {
                return this.status;
            }

            @Override
            public int getStatusCode() {
                return this.status.getCode();
            }

            @Override
            public Headers getHeaders() {
                return this.headers;
            }

            @Override
            public TransformablePublisher<ByteBuf> getBody() {
                return new BufferingPublisher(ReferenceCounted::release, write -> {
                    Handler.this.write = write;
                    if (Handler.this.received != null) {
                        for (HttpContent httpContent : Handler.this.received) {
                            if (httpContent.content().readableBytes() > 0) {
                                write.item((Object)httpContent.content().touch((Object)"emitting to user code"));
                            } else {
                                httpContent.release();
                            }
                            if (!(httpContent instanceof LastHttpContent)) continue;
                            ContentStreamingRequestAction.this.dispose(this.channelPipeline, Handler.this.response);
                            write.complete();
                        }
                        Handler.this.received.clear();
                    }
                    return new Subscription(){

                        public void request(long n) {
                            DefaultStreamedResponse.this.channelPipeline.read();
                        }

                        public void cancel() {
                            ContentStreamingRequestAction.this.forceDispose(DefaultStreamedResponse.this.channelPipeline);
                        }
                    };
                });
            }

            @Override
            public void forwardTo(Response response) {
                this.forwardTo(response, (Action<? super MutableHeaders>)Action.noop());
            }

            @Override
            public void forwardTo(final Response response, Action<? super MutableHeaders> headerMutator) {
                MutableHeaders outgoingHeaders = response.getHeaders();
                outgoingHeaders.copy(this.headers);
                outgoingHeaders.remove((CharSequence)HttpHeaderNames.CONNECTION);
                Exceptions.uncheck(() -> headerMutator.execute((Object)outgoingHeaders));
                response.status(this.status);
                this.getBody().bindExec(ReferenceCounted::release).subscribe((Subscriber)new Subscriber<ByteBuf>(){
                    private Subscription subscription;
                    private Subscriber<? super ByteBuf> downstream;

                    public void onSubscribe(Subscription s) {
                        this.subscription = s;
                        this.subscription.request(1L);
                    }

                    public void onNext(final ByteBuf byteBuf) {
                        if (this.downstream == null) {
                            response.sendStream((Publisher<? extends ByteBuf>)((Publisher)s -> {
                                this.downstream = s;
                                this.downstream.onSubscribe(new Subscription(){
                                    private ByteBuf initial;
                                    {
                                        this.initial = byteBuf;
                                    }

                                    public void request(long n) {
                                        if (this.initial == null) {
                                            subscription.request(n);
                                        } else {
                                            ByteBuf initialRef = this.initial;
                                            this.initial = null;
                                            downstream.onNext((Object)initialRef);
                                            if (--n > 0L) {
                                                subscription.request(1L);
                                            }
                                        }
                                    }

                                    public void cancel() {
                                        subscription.cancel();
                                        if (this.initial != null) {
                                            this.initial.release();
                                        }
                                    }
                                });
                            }));
                        } else {
                            this.downstream.onNext((Object)byteBuf);
                        }
                    }

                    public void onError(Throwable t) {
                        if (this.downstream == null) {
                            response.sendStream((Publisher<? extends ByteBuf>)((Publisher)s -> s.onError(t)));
                        } else {
                            this.downstream.onError(t);
                        }
                    }

                    public void onComplete() {
                        if (this.downstream == null) {
                            response.send();
                        } else {
                            this.downstream.onComplete();
                        }
                    }
                });
            }
        }
    }
}

