package com.azure.data.cosmos.internal.directconnectivity.rntbd;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.GoneException;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.directconnectivity.RntbdTransportClient;
import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonSerialize(using = JsonSerializer.class)
/* loaded from: input_file:com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.class */
public final class RntbdServiceEndpoint implements RntbdEndpoint {
    private static final AtomicLong instanceCount = new AtomicLong();
    private static final Logger logger = LoggerFactory.getLogger(RntbdServiceEndpoint.class);
    private static final String namePrefix = RntbdServiceEndpoint.class.getSimpleName() + '-';
    private final RntbdClientChannelPool channelPool;
    private final AtomicBoolean closed;
    private final RntbdMetrics metrics;
    private final String name;
    private final SocketAddress remoteAddress;
    private final RntbdRequestTimer requestTimer;

    /* loaded from: input_file:com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint$JsonSerializer.class */
    static final class JsonSerializer extends StdSerializer<RntbdServiceEndpoint> {
        public JsonSerializer() {
            this(null);
        }

        public JsonSerializer(Class<RntbdServiceEndpoint> cls) {
            super(cls);
        }

        public void serialize(RntbdServiceEndpoint rntbdServiceEndpoint, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
            jsonGenerator.writeStartObject();
            jsonGenerator.writeStringField(rntbdServiceEndpoint.name, rntbdServiceEndpoint.remoteAddress.toString());
            jsonGenerator.writeObjectField("channelPool", rntbdServiceEndpoint.channelPool);
            jsonGenerator.writeEndObject();
        }
    }

    /* loaded from: input_file:com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint$Provider.class */
    public static final class Provider implements RntbdEndpoint.Provider {
        private static final Logger logger = LoggerFactory.getLogger(Provider.class);
        private final RntbdEndpoint.Config config;
        private final NioEventLoopGroup eventLoopGroup;
        private final RntbdRequestTimer requestTimer;
        private final AtomicBoolean closed = new AtomicBoolean();
        private final ConcurrentHashMap<String, RntbdEndpoint> endpoints = new ConcurrentHashMap<>();

        public Provider(RntbdTransportClient.Options options, SslContext sslContext) {
            Preconditions.checkNotNull(options, "options");
            Preconditions.checkNotNull(sslContext, "sslContext");
            DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("CosmosEventLoop", true);
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            this.config = new RntbdEndpoint.Config(options, sslContext, logger.isTraceEnabled() ? LogLevel.TRACE : logger.isDebugEnabled() ? LogLevel.DEBUG : null);
            this.requestTimer = new RntbdRequestTimer(this.config.getRequestTimeout());
            this.eventLoopGroup = new NioEventLoopGroup(availableProcessors, defaultThreadFactory);
        }

        @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Provider, java.lang.AutoCloseable
        public void close() throws RuntimeException {
            if (!this.closed.compareAndSet(false, true)) {
                logger.debug("\n  [{}]\n  already closed", this);
                return;
            }
            this.requestTimer.close();
            Iterator<RntbdEndpoint> it = this.endpoints.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.eventLoopGroup.shutdownGracefully().addListener(future -> {
                if (future.isSuccess()) {
                    logger.debug("\n  [{}]\n  closed endpoints", this);
                } else {
                    logger.error("\n  [{}]\n  failed to close endpoints due to ", this, future.cause());
                }
            });
        }

        @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Provider
        public RntbdEndpoint.Config config() {
            return this.config;
        }

        @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Provider
        public int count() {
            return this.endpoints.size();
        }

        @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Provider
        public RntbdEndpoint get(URI uri) {
            return this.endpoints.computeIfAbsent(uri.getAuthority(), str -> {
                return new RntbdServiceEndpoint(this.config, this.eventLoopGroup, this.requestTimer, uri);
            });
        }

        @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Provider
        public Stream<RntbdEndpoint> list() {
            return this.endpoints.values().stream();
        }

        private void deleteEndpoint(URI uri) {
            Preconditions.checkNotNull(uri, "physicalAddress: %s", uri);
            RntbdEndpoint remove = this.endpoints.remove(uri.getAuthority());
            if (remove != null) {
                remove.close();
            }
        }
    }

    private RntbdServiceEndpoint(RntbdEndpoint.Config config, NioEventLoopGroup nioEventLoopGroup, RntbdRequestTimer rntbdRequestTimer, URI uri) {
        Bootstrap remoteAddress = new Bootstrap().channel(NioSocketChannel.class).group(nioEventLoopGroup).option(ChannelOption.AUTO_READ, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(config.getConnectionTimeout())).option(ChannelOption.SO_KEEPALIVE, true).remoteAddress(uri.getHost(), uri.getPort());
        this.name = namePrefix + instanceCount.incrementAndGet();
        this.channelPool = new RntbdClientChannelPool(remoteAddress, config);
        this.remoteAddress = remoteAddress.config().remoteAddress();
        this.metrics = new RntbdMetrics(this.name);
        this.closed = new AtomicBoolean();
        this.requestTimer = rntbdRequestTimer;
    }

    @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint
    public String getName() {
        return this.name;
    }

    @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.channelPool.close();
            this.metrics.close();
        }
    }

    @Override // com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint
    public RntbdRequestRecord request(RntbdRequestArgs rntbdRequestArgs) {
        throwIfClosed();
        if (logger.isDebugEnabled()) {
            rntbdRequestArgs.traceOperation(logger, null, "request", new Object[0]);
            logger.debug("\n  {}\n  {}\n  REQUEST", this, rntbdRequestArgs);
        }
        RntbdRequestRecord write = write(rntbdRequestArgs);
        this.metrics.incrementRequestCount();
        write.whenComplete((storeResponse, th) -> {
            rntbdRequestArgs.traceOperation(logger, null, "requestComplete", storeResponse, th);
            this.metrics.incrementResponseCount();
            if (th != null) {
                this.metrics.incrementErrorResponseCount();
            }
            if (logger.isDebugEnabled()) {
                if (th != null) {
                    logger.debug("\n  [{}]\n  {}\n  request failed due to ", new Object[]{this, rntbdRequestArgs, th});
                } else {
                    logger.debug("\n  [{}]\n  {}\n  request succeeded with response status: {}", new Object[]{this, rntbdRequestArgs, Integer.valueOf(storeResponse.getStatus())});
                }
            }
        });
        return write;
    }

    public String toString() {
        return RntbdObjectMapper.toJson(this);
    }

    private void releaseToPool(Channel channel) {
        logger.debug("\n  [{}]\n  {}\n  RELEASE", this, channel);
        this.channelPool.release(channel).addListener(future -> {
            if (logger.isDebugEnabled()) {
                if (future.isSuccess()) {
                    logger.debug("\n  [{}]\n  {}\n  release succeeded", this, channel);
                } else {
                    logger.debug("\n  [{}]\n  {}\n  release failed due to {}", new Object[]{this, channel, future.cause()});
                }
            }
        });
    }

    private void throwIfClosed() {
        Preconditions.checkState(!this.closed.get(), "%s is closed", this);
    }

    private RntbdRequestRecord write(RntbdRequestArgs rntbdRequestArgs) {
        RntbdRequestRecord rntbdRequestRecord = new RntbdRequestRecord(rntbdRequestArgs, this.requestTimer);
        logger.debug("\n  [{}]\n  {}\n  WRITE", this, rntbdRequestArgs);
        this.channelPool.acquire().addListener(future -> {
            if (future.isSuccess()) {
                rntbdRequestArgs.traceOperation(logger, null, "write", new Object[0]);
                Channel channel = (Channel) future.get();
                releaseToPool(channel);
                channel.write(rntbdRequestRecord).addListener(channelFuture -> {
                    rntbdRequestArgs.traceOperation(logger, null, "writeComplete", channel);
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    this.metrics.incrementErrorResponseCount();
                });
                return;
            }
            UUID activityId = rntbdRequestArgs.getActivityId();
            Throwable cause = future.cause();
            if (future.isCancelled()) {
                logger.debug("\n  [{}]\n  {}\n  write cancelled: {}", new Object[]{this, rntbdRequestArgs, cause});
                rntbdRequestRecord.cancel(true);
                return;
            }
            logger.debug("\n  [{}]\n  {}\n  write failed due to {} ", new Object[]{this, rntbdRequestArgs, cause});
            String message = cause.getMessage();
            GoneException goneException = new GoneException(String.format("failed to establish connection to %s: %s", this.remoteAddress, message), cause instanceof Exception ? (Exception) cause : new IOException(message, cause), (Map<String, String>) ImmutableMap.of(HttpConstants.HttpHeaders.ACTIVITY_ID, activityId.toString()), rntbdRequestArgs.getReplicaPath());
            BridgeInternal.setRequestHeaders(goneException, rntbdRequestArgs.getServiceRequest().getHeaders());
            rntbdRequestRecord.completeExceptionally(goneException);
        });
        return rntbdRequestRecord;
    }
}
