/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.Error;
import com.microsoft.azure.cosmosdb.internal.InternalServerErrorException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.ConflictException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.ForbiddenException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.GoneException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.LockedException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.MethodNotAllowedException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.PartitionKeyRangeGoneException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.PreconditionFailedException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RequestEntityTooLargeException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RequestRateTooLargeException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RequestTimeoutException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RetryWithException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.ServiceUnavailableException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.StoreResponse;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.UnauthorizedException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdConstants;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContext;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContextNegotiator;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContextRequest;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdObjectMapper;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdReporter;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestArgs;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestRecord;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdResponse;
import com.microsoft.azure.cosmosdb.rx.internal.BadRequestException;
import com.microsoft.azure.cosmosdb.rx.internal.InvalidPartitionException;
import com.microsoft.azure.cosmosdb.rx.internal.NotFoundException;
import com.microsoft.azure.cosmosdb.rx.internal.PartitionIsMigratingException;
import com.microsoft.azure.cosmosdb.rx.internal.PartitionKeyRangeIsSplittingException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.EventExecutor;
import java.math.BigDecimal;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RntbdRequestManager
implements ChannelHandler,
ChannelInboundHandler,
ChannelOutboundHandler {
    private static final Logger logger = LoggerFactory.getLogger(RntbdRequestManager.class);
    private final CompletableFuture<RntbdContext> contextFuture = new CompletableFuture();
    private final CompletableFuture<RntbdContextRequest> contextRequestFuture = new CompletableFuture();
    private final ConcurrentHashMap<Long, RntbdRequestRecord> pendingRequests;
    private boolean closingExceptionally = false;
    private ChannelHandlerContext context;
    private RntbdRequestRecord pendingRequest;
    private CoalescingBufferQueue pendingWrites;

    public RntbdRequestManager(int capacity) {
        Preconditions.checkState((capacity > 0 ? 1 : 0) != 0);
        this.pendingRequests = new ConcurrentHashMap(capacity);
    }

    public void handlerAdded(ChannelHandlerContext context) {
        this.traceOperation(context, "handlerAdded", new Object[0]);
    }

    public void handlerRemoved(ChannelHandlerContext context) {
        this.traceOperation(context, "handlerRemoved", new Object[0]);
    }

    public void channelActive(ChannelHandlerContext context) {
        this.traceOperation(this.context, "channelActive", new Object[0]);
        context.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext context) {
        this.traceOperation(this.context, "channelInactive", new Object[0]);
        context.fireChannelInactive();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext context, Object message) {
        this.traceOperation(context, "channelRead", new Object[0]);
        if (message instanceof RntbdResponse) {
            try {
                this.messageReceived(context, (RntbdResponse)message);
            }
            catch (Throwable throwable) {
                RntbdReporter.reportIssue(logger, context, "{} ", message, throwable);
                this.exceptionCaught(context, throwable);
            }
            finally {
                ReferenceCountUtil.release((Object)message);
            }
            this.traceOperation(context, "channelReadComplete", new Object[0]);
            return;
        }
        String reason = Strings.lenientFormat((String)"expected message of type %s, not %s: %s", (Object[])new Object[]{RntbdResponse.class, message.getClass(), message});
        IllegalStateException error = new IllegalStateException(reason);
        RntbdReporter.reportIssue(logger, context, "", error);
        this.exceptionCaught(context, error);
    }

    public void channelReadComplete(ChannelHandlerContext context) {
        this.traceOperation(context, "channelReadComplete", new Object[0]);
        context.fireChannelReadComplete();
    }

    public void channelRegistered(ChannelHandlerContext context) {
        this.traceOperation(context, "channelRegistered", new Object[0]);
        if (this.context != null || this.pendingWrites != null) {
            throw new IllegalStateException();
        }
        this.pendingWrites = new CoalescingBufferQueue(context.channel());
        this.context = context;
        context.fireChannelRegistered();
    }

    public void channelUnregistered(ChannelHandlerContext context) {
        this.traceOperation(context, "channelUnregistered", new Object[0]);
        if (this.context == null || this.pendingWrites == null) {
            throw new IllegalStateException();
        }
        this.completeAllPendingRequestsExceptionally(context, ClosedWithPendingRequestsException.INSTANCE);
        this.pendingWrites = null;
        this.context = null;
        context.fireChannelUnregistered();
    }

    public void channelWritabilityChanged(ChannelHandlerContext context) {
        this.traceOperation(context, "channelWritabilityChanged", new Object[0]);
        context.fireChannelWritabilityChanged();
    }

    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
        this.traceOperation(context, "exceptionCaught", cause);
        if (!this.closingExceptionally) {
            RntbdReporter.reportIssueUnless(cause != ClosedWithPendingRequestsException.INSTANCE, logger, context, "expected an exception other than ", ClosedWithPendingRequestsException.INSTANCE);
            this.completeAllPendingRequestsExceptionally(context, cause);
            context.pipeline().flush().close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext context, Object event) {
        this.traceOperation(context, "userEventTriggered", event);
        if (event instanceof RntbdContext) {
            try {
                this.completeRntbdContextFuture(context, (RntbdContext)event);
            }
            catch (Throwable error) {
                RntbdReporter.reportIssue(logger, context, "{}: ", event, error);
                this.exceptionCaught(context, error);
            }
            return;
        }
        context.fireUserEventTriggered(event);
    }

    public void bind(ChannelHandlerContext context, SocketAddress localAddress, ChannelPromise promise) {
        this.traceOperation(context, "bind", new Object[0]);
        context.bind(localAddress, promise);
    }

    public void close(ChannelHandlerContext context, ChannelPromise promise) {
        this.traceOperation(context, "close", new Object[0]);
        this.completeAllPendingRequestsExceptionally(context, ClosedWithPendingRequestsException.INSTANCE);
        SslHandler sslHandler = (SslHandler)context.pipeline().get(SslHandler.class);
        if (sslHandler != null) {
            sslHandler.closeOutbound();
        }
        context.close(promise);
    }

    public void connect(ChannelHandlerContext context, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        this.traceOperation(context, "connect", new Object[0]);
        context.connect(remoteAddress, localAddress, promise);
    }

    public void deregister(ChannelHandlerContext context, ChannelPromise promise) {
        this.traceOperation(context, "deregister", new Object[0]);
        context.deregister(promise);
    }

    public void disconnect(ChannelHandlerContext context, ChannelPromise promise) {
        this.traceOperation(context, "disconnect", new Object[0]);
        context.disconnect(promise);
    }

    public void flush(ChannelHandlerContext context) {
        this.traceOperation(context, "flush", new Object[0]);
        context.flush();
    }

    public void read(ChannelHandlerContext context) {
        this.traceOperation(context, "read", new Object[0]);
        context.read();
    }

    public void write(ChannelHandlerContext context, Object message, ChannelPromise promise) {
        this.traceOperation(context, "write", message);
        if (message instanceof RntbdRequestRecord) {
            context.write((Object)this.addPendingRequestRecord(context, (RntbdRequestRecord)message), promise);
        } else {
            String reason = Strings.lenientFormat((String)"Expected message of type %s, not %s", (Object[])new Object[]{RntbdRequestArgs.class, message.getClass()});
            this.exceptionCaught(context, new IllegalStateException(reason));
        }
    }

    CompletableFuture<RntbdContextRequest> getRntbdContextRequestFuture() {
        return this.contextRequestFuture;
    }

    boolean hasRntbdContext() {
        return this.contextFuture.getNow(null) != null;
    }

    boolean isServiceable(int capacity) {
        int pendingRequestLimit = this.hasRntbdContext() ? capacity : Math.min(capacity, 30);
        return this.pendingRequests.size() < pendingRequestLimit;
    }

    void pendWrite(ByteBuf out, ChannelPromise promise) {
        this.pendingWrites.add(out, promise);
    }

    private RntbdRequestArgs addPendingRequestRecord(ChannelHandlerContext context, RntbdRequestRecord record) {
        this.pendingRequest = this.pendingRequests.compute(record.getTransportRequestId(), (id, current) -> {
            RntbdReporter.reportIssueUnless(current == null, logger, context, "id: {}, current: {}, request: {}", id, current, record);
            Timeout pendingRequestTimeout = record.newTimeout(timeout -> {
                EventExecutor executor = context.executor();
                if (executor.inEventLoop()) {
                    record.expire();
                } else {
                    executor.next().execute(record::expire);
                }
            });
            record.whenComplete((response, error) -> {
                this.pendingRequests.remove(id);
                pendingRequestTimeout.cancel();
            });
            return record;
        });
        return this.pendingRequest.getArgs();
    }

    private Optional<RntbdContext> getRntbdContext() {
        return Optional.of(this.contextFuture.getNow(null));
    }

    private void completeAllPendingRequestsExceptionally(ChannelHandlerContext context, Throwable throwable) {
        if (this.closingExceptionally) {
            RntbdReporter.reportIssueUnless(throwable == ClosedWithPendingRequestsException.INSTANCE, logger, context, "throwable: ", throwable);
            RntbdReporter.reportIssueUnless(this.pendingRequests.isEmpty() && this.pendingWrites.isEmpty(), logger, context, "pendingRequests: {}, pendingWrites: {}", this.pendingRequests.isEmpty(), this.pendingWrites.isEmpty());
            return;
        }
        this.closingExceptionally = true;
        if (!this.pendingWrites.isEmpty()) {
            this.pendingWrites.releaseAndFailAll((ChannelOutboundInvoker)context, (Throwable)ClosedWithPendingRequestsException.INSTANCE);
        }
        if (!this.pendingRequests.isEmpty()) {
            if (!this.contextRequestFuture.isDone()) {
                this.contextRequestFuture.completeExceptionally(throwable);
            }
            if (!this.contextFuture.isDone()) {
                this.contextFuture.completeExceptionally(throwable);
            }
            int count = this.pendingRequests.size();
            Exception contextRequestException = null;
            String phrase = null;
            if (this.contextRequestFuture.isCompletedExceptionally()) {
                try {
                    this.contextRequestFuture.get();
                }
                catch (CancellationException error) {
                    phrase = "RNTBD context request write cancelled";
                    contextRequestException = error;
                }
                catch (Exception error) {
                    phrase = "RNTBD context request write failed";
                    contextRequestException = error;
                }
                catch (Throwable error) {
                    phrase = "RNTBD context request write failed";
                    contextRequestException = new ChannelException(error);
                }
            } else if (this.contextFuture.isCompletedExceptionally()) {
                try {
                    this.contextFuture.get();
                }
                catch (CancellationException error) {
                    phrase = "RNTBD context request read cancelled";
                    contextRequestException = error;
                }
                catch (Exception error) {
                    phrase = "RNTBD context request read failed";
                    contextRequestException = error;
                }
                catch (Throwable error) {
                    phrase = "RNTBD context request read failed";
                    contextRequestException = new ChannelException(error);
                }
            } else {
                phrase = "closed exceptionally";
            }
            String message = Strings.lenientFormat((String)"%s %s with %s pending requests", (Object[])new Object[]{context, phrase, count});
            Exception cause = throwable == ClosedWithPendingRequestsException.INSTANCE ? (contextRequestException == null ? ClosedWithPendingRequestsException.INSTANCE : contextRequestException) : (throwable instanceof Exception ? (Exception)throwable : new ChannelException(throwable));
            for (RntbdRequestRecord record : this.pendingRequests.values()) {
                Map requestHeaders = record.getArgs().getServiceRequest().getHeaders();
                String requestUri = record.getArgs().getPhysicalAddress().toString();
                GoneException error = new GoneException(message, cause, (Map)null, requestUri);
                BridgeInternal.setRequestHeaders((DocumentClientException)((Object)error), (Map)requestHeaders);
                record.completeExceptionally(error);
            }
        }
    }

    private void completeRntbdContextFuture(ChannelHandlerContext context, RntbdContext value) {
        this.contextFuture.complete(value);
        RntbdContextNegotiator negotiator = (RntbdContextNegotiator)context.channel().pipeline().get(RntbdContextNegotiator.class);
        negotiator.removeInboundHandler();
        negotiator.removeOutboundHandler();
        if (!this.pendingWrites.isEmpty()) {
            this.pendingWrites.writeAndRemoveAll(context);
        }
    }

    private void messageReceived(ChannelHandlerContext context, RntbdResponse response) {
        Long transportRequestId = response.getTransportRequestId();
        if (transportRequestId == null) {
            RntbdReporter.reportIssue(logger, context, "{} ignored because there is no transport request identifier, response", new Object[0]);
            return;
        }
        RntbdRequestRecord pendingRequest = this.pendingRequests.get(transportRequestId);
        if (pendingRequest == null) {
            RntbdReporter.reportIssue(logger, context, "{} ignored because there is no matching pending request", response);
            return;
        }
        HttpResponseStatus status = response.getStatus();
        UUID activityId = response.getActivityId();
        if (HttpResponseStatus.OK.code() <= status.code() && status.code() < HttpResponseStatus.MULTIPLE_CHOICES.code()) {
            StoreResponse storeResponse = response.toStoreResponse(this.contextFuture.getNow(null));
            pendingRequest.complete(storeResponse);
        } else {
            Object cause;
            long lsn = (Long)response.getHeader(RntbdConstants.RntbdResponseHeader.LSN);
            String partitionKeyRangeId = (String)response.getHeader(RntbdConstants.RntbdResponseHeader.PartitionKeyRangeId);
            Error error = response.hasPayload() ? BridgeInternal.createError((ObjectNode)RntbdObjectMapper.readTree(response)) : new Error(Integer.toString(status.code()), status.reasonPhrase(), status.codeClass().name());
            Map<String, String> responseHeaders = response.getHeaders().asMap(this.getRntbdContext().orElseThrow(IllegalStateException::new), activityId);
            block0 : switch (status.code()) {
                case 400: {
                    cause = new BadRequestException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 409: {
                    cause = new ConflictException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 403: {
                    cause = new ForbiddenException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 410: {
                    int subStatusCode = Math.toIntExact((Long)response.getHeader(RntbdConstants.RntbdResponseHeader.SubStatus));
                    switch (subStatusCode) {
                        case 1007: {
                            cause = new PartitionKeyRangeIsSplittingException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                        case 1008: {
                            cause = new PartitionIsMigratingException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                        case 1000: {
                            cause = new InvalidPartitionException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                        case 1002: {
                            cause = new PartitionKeyRangeGoneException(error, lsn, partitionKeyRangeId, responseHeaders);
                            break block0;
                        }
                    }
                    cause = new GoneException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 500: {
                    cause = new InternalServerErrorException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 423: {
                    cause = new LockedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 405: {
                    cause = new MethodNotAllowedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 404: {
                    cause = new NotFoundException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 412: {
                    cause = new PreconditionFailedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 413: {
                    cause = new RequestEntityTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 408: {
                    cause = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 449: {
                    cause = new RetryWithException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 503: {
                    cause = new ServiceUnavailableException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 429: {
                    cause = new RequestRateTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                case 401: {
                    cause = new UnauthorizedException(error, lsn, partitionKeyRangeId, responseHeaders);
                    break;
                }
                default: {
                    cause = new DocumentClientException(status.code(), error, responseHeaders);
                }
            }
            logger.trace("\n  {}\n  [transportRequestId: {}, activityId: {}, statusCode: {}, subStatusCode: {}]\n  ", new Object[]{context.channel(), transportRequestId, cause.getActivityId(), cause.getStatusCode(), cause.getSubStatusCode(), cause});
            pendingRequest.completeExceptionally((Throwable)cause);
        }
    }

    private void traceOperation(ChannelHandlerContext context, String operationName, Object ... args) {
        if (logger.isTraceEnabled()) {
            BigDecimal lifetime;
            long birthTime;
            if (this.pendingRequest == null) {
                birthTime = System.nanoTime();
                lifetime = BigDecimal.ZERO;
            } else {
                birthTime = this.pendingRequest.getBirthTime();
                lifetime = BigDecimal.valueOf(this.pendingRequest.getLifetime().toNanos(), 6);
            }
            logger.trace("{},{},\"{}({})\",\"{}\",\"{}\"", new Object[]{birthTime, lifetime, operationName, Stream.of(args).map(arg -> arg == null ? "null" : arg.toString()).collect(Collectors.joining(",")), this.pendingRequest, context});
        }
    }

    private static class ClosedWithPendingRequestsException
    extends RuntimeException {
        static ClosedWithPendingRequestsException INSTANCE = new ClosedWithPendingRequestsException();

        private ClosedWithPendingRequestsException() {
            super(null, null, false, false);
        }
    }
}

