/*
 * Decompiled with CFR 0.152.
 */
package ratpack.http.client.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.Execution;
import ratpack.exec.Fulfiller;
import ratpack.func.Action;
import ratpack.http.Headers;
import ratpack.http.MutableHeaders;
import ratpack.http.Response;
import ratpack.http.Status;
import ratpack.http.client.RequestSpec;
import ratpack.http.client.StreamedResponse;
import ratpack.http.client.internal.RequestActionSupport;
import ratpack.http.internal.DefaultStatus;
import ratpack.http.internal.HttpHeaderConstants;
import ratpack.http.internal.NettyHeadersBackedHeaders;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;

class ContentStreamingRequestAction
extends RequestActionSupport<StreamedResponse> {
    private final AtomicBoolean subscribedTo = new AtomicBoolean();

    public ContentStreamingRequestAction(Action<? super RequestSpec> requestConfigurer, URI uri, Execution execution, ByteBufAllocator byteBufAllocator) {
        super(requestConfigurer, uri, execution, byteBufAllocator);
    }

    @Override
    protected RequestActionSupport<StreamedResponse> buildRedirectRequestAction(Action<? super RequestSpec> redirectRequestConfig, URI locationUrl) {
        return new ContentStreamingRequestAction(redirectRequestConfig, locationUrl, this.execution, this.byteBufAllocator);
    }

    @Override
    protected void addResponseHandlers(final ChannelPipeline p, final Fulfiller<? super StreamedResponse> fulfiller) {
        p.addLast("httpResponseHandler", (ChannelHandler)new SimpleChannelInboundHandler<HttpResponse>(false){

            public void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) throws Exception {
                p.channel().config().setAutoRead(false);
                ContentStreamingRequestAction.this.execution.onCleanup(() -> {
                    if (!ContentStreamingRequestAction.this.subscribedTo.get() && ctx.channel().isOpen()) {
                        ctx.close();
                    }
                });
                NettyHeadersBackedHeaders headers = new NettyHeadersBackedHeaders(msg.headers());
                DefaultStatus status = new DefaultStatus(msg.status());
                ContentStreamingRequestAction.this.success(fulfiller, new DefaultStreamedResponse(p, status, headers));
            }

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.close();
                ContentStreamingRequestAction.this.error(fulfiller, cause);
            }
        });
    }

    private class HttpContentPublisher
    implements Publisher<ByteBuf> {
        private Subscriber<? super ByteBuf> subscriber;
        private final ChannelPipeline channelPipeline;
        private final AtomicBoolean stopped = new AtomicBoolean();

        public HttpContentPublisher(ChannelPipeline p) {
            this.channelPipeline = p;
        }

        public void subscribe(Subscriber<? super ByteBuf> s) {
            ContentStreamingRequestAction.this.subscribedTo.compareAndSet(false, true);
            this.subscriber = s;
            this.channelPipeline.remove("httpResponseHandler");
            this.channelPipeline.addLast("httpContentHandler", (ChannelHandler)new SimpleChannelInboundHandler<HttpContent>(false){

                public void channelRead0(ChannelHandlerContext ctx, HttpContent msg) throws Exception {
                    HttpContentPublisher.this.subscriber.onNext((Object)msg.content());
                    if (msg instanceof LastHttpContent && HttpContentPublisher.this.stopped.compareAndSet(false, true)) {
                        HttpContentPublisher.this.subscriber.onComplete();
                    }
                }

                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                    if (HttpContentPublisher.this.stopped.compareAndSet(false, true)) {
                        HttpContentPublisher.this.subscriber.onError(cause);
                    }
                    if (ctx.channel().isOpen()) {
                        ctx.close();
                    }
                }
            });
            s.onSubscribe(new Subscription(){

                public void request(long n) {
                    if (n < 1L) {
                        throw new IllegalArgumentException("3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST throw a java.lang.IllegalArgumentException if the argument is <= 0.");
                    }
                    if (!HttpContentPublisher.this.stopped.get()) {
                        if (n == Long.MAX_VALUE) {
                            HttpContentPublisher.this.channelPipeline.channel().config().setAutoRead(true);
                        } else {
                            int i = 0;
                            while ((long)i < n) {
                                HttpContentPublisher.this.channelPipeline.channel().read();
                                ++i;
                            }
                        }
                    }
                }

                public void cancel() {
                    HttpContentPublisher.this.stopped.set(true);
                    HttpContentPublisher.this.channelPipeline.channel().close();
                }
            });
        }
    }

    private class DefaultStreamedResponse
    implements StreamedResponse {
        private final ChannelPipeline channelPipeline;
        private final Status status;
        private final Headers headers;

        public DefaultStreamedResponse(ChannelPipeline p, Status status, Headers headers) {
            this.channelPipeline = p;
            this.status = status;
            this.headers = headers;
        }

        @Override
        public Status getStatus() {
            return this.status;
        }

        @Override
        public int getStatusCode() {
            return this.status.getCode();
        }

        @Override
        public Headers getHeaders() {
            return this.headers;
        }

        @Override
        public TransformablePublisher<ByteBuf> getBody() {
            return Streams.transformable(new HttpContentPublisher(this.channelPipeline));
        }

        @Override
        public void send(Response response) {
            this.send(response, Action.noop());
        }

        @Override
        public void send(Response response, Action<? super MutableHeaders> headerMutator) {
            response.getHeaders().copy(this.headers);
            response.getHeaders().remove(HttpHeaderConstants.CONTENT_LENGTH);
            try {
                headerMutator.execute(response.getHeaders());
            }
            catch (Exception e) {
                throw Exceptions.uncheck(e);
            }
            response.getHeaders().set(HttpHeaderConstants.TRANSFER_ENCODING, HttpHeaderConstants.CHUNKED);
            response.status(this.status);
            response.sendStream(this.getBody());
        }
    }
}

