/*
 * Decompiled with CFR 0.152.
 */
package io.micrometer.shaded.reactor.netty.resources;

import io.micrometer.shaded.io.netty.bootstrap.Bootstrap;
import io.micrometer.shaded.io.netty.channel.Channel;
import io.micrometer.shaded.io.netty.channel.ChannelFuture;
import io.micrometer.shaded.io.netty.channel.ChannelFutureListener;
import io.micrometer.shaded.io.netty.channel.ChannelHandler;
import io.micrometer.shaded.io.netty.channel.ChannelHandlerContext;
import io.micrometer.shaded.io.netty.util.AttributeKey;
import io.micrometer.shaded.io.netty.util.internal.PlatformDependent;
import io.micrometer.shaded.org.reactorstreams.Publisher;
import io.micrometer.shaded.org.reactorstreams.Subscription;
import io.micrometer.shaded.reactor.core.CoreSubscriber;
import io.micrometer.shaded.reactor.core.Disposable;
import io.micrometer.shaded.reactor.core.publisher.Mono;
import io.micrometer.shaded.reactor.core.publisher.MonoProcessor;
import io.micrometer.shaded.reactor.core.publisher.MonoSink;
import io.micrometer.shaded.reactor.core.publisher.Operators;
import io.micrometer.shaded.reactor.netty.Connection;
import io.micrometer.shaded.reactor.netty.ConnectionObserver;
import io.micrometer.shaded.reactor.netty.FutureMono;
import io.micrometer.shaded.reactor.netty.Metrics;
import io.micrometer.shaded.reactor.netty.ReactorNetty;
import io.micrometer.shaded.reactor.netty.channel.BootstrapHandlers;
import io.micrometer.shaded.reactor.netty.channel.ChannelOperations;
import io.micrometer.shaded.reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import io.micrometer.shaded.reactor.netty.internal.shaded.reactor.pool.PooledRef;
import io.micrometer.shaded.reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import io.micrometer.shaded.reactor.netty.resources.ConnectionProvider;
import io.micrometer.shaded.reactor.netty.resources.NewConnectionProvider;
import io.micrometer.shaded.reactor.netty.resources.PooledConnectionProviderMetrics;
import io.micrometer.shaded.reactor.util.Logger;
import io.micrometer.shaded.reactor.util.Loggers;
import io.micrometer.shaded.reactor.util.annotation.NonNull;
import io.micrometer.shaded.reactor.util.concurrent.Queues;
import io.micrometer.shaded.reactor.util.context.Context;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

final class PooledConnectionProvider
implements ConnectionProvider {
    final ConcurrentMap<PoolKey, InstrumentedPool<PooledConnection>> channelPools = PlatformDependent.newConcurrentHashMap();
    final String name;
    final PoolFactory poolFactory;
    final long acquireTimeout;
    final int maxConnections;
    static final Logger log = Loggers.getLogger(PooledConnectionProvider.class);
    static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf("connectionOwner");
    static final BiPredicate<PooledConnection, PooledRefMetadata> EVICTION_PREDICATE = (pooledConnection, metadata) -> !pooledConnection.channel.isActive() || !pooledConnection.isPersistent();
    static final Function<PooledConnection, Publisher<Void>> DESTROY_HANDLER = pooledConnection -> {
        if (!pooledConnection.channel.isActive()) {
            return Mono.empty();
        }
        return FutureMono.from(pooledConnection.channel.close());
    };

    PooledConnectionProvider(String name, PoolFactory poolFactory) {
        this(name, poolFactory, 0L, -1);
    }

    PooledConnectionProvider(String name, PoolFactory poolFactory, long acquireTimeout, int maxConnections) {
        this.name = name;
        this.poolFactory = poolFactory;
        this.acquireTimeout = acquireTimeout;
        this.maxConnections = maxConnections;
    }

    @Override
    public void disposeWhen(@NonNull SocketAddress address) {
        List<Map.Entry> toDispose = this.channelPools.entrySet().stream().filter(p -> this.compareAddresses(((PoolKey)p.getKey()).holder, address)).collect(Collectors.toList());
        toDispose.forEach(e -> {
            if (this.channelPools.remove(e.getKey(), e.getValue())) {
                if (log.isDebugEnabled()) {
                    log.debug("Disposing pool for {}", ((PoolKey)e.getKey()).fqdn);
                }
                ((InstrumentedPool)e.getValue()).dispose();
            }
        });
    }

    private boolean compareAddresses(SocketAddress origin, SocketAddress target) {
        if (origin.equals(target)) {
            return true;
        }
        if (origin instanceof InetSocketAddress && target instanceof InetSocketAddress) {
            InetSocketAddress isaOrigin = (InetSocketAddress)origin;
            InetSocketAddress isaTarget = (InetSocketAddress)target;
            InetAddress iaTarget = isaTarget.getAddress();
            return iaTarget != null && iaTarget.isAnyLocalAddress() && isaOrigin.getPort() == isaTarget.getPort();
        }
        return false;
    }

    public Mono<Connection> acquire(Bootstrap b) {
        return Mono.create(sink -> {
            Bootstrap bootstrap = b.clone();
            ChannelOperations.OnSetup opsFactory = BootstrapHandlers.channelOperationFactory(bootstrap);
            ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
            NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
            ChannelHandler handler = bootstrap.config().handler();
            PoolKey holder = new PoolKey(bootstrap.config().remoteAddress(), handler != null ? handler.hashCode() : -1);
            InstrumentedPool pool = this.channelPools.computeIfAbsent(holder, poolKey -> {
                if (log.isDebugEnabled()) {
                    log.debug("Creating new client pool [{}] for {}", this.name, bootstrap.config().remoteAddress());
                }
                InstrumentedPool<PooledConnection> newPool = new PooledConnectionAllocator((Bootstrap)bootstrap, (PoolFactory)this.poolFactory, (ChannelOperations.OnSetup)opsFactory).pool;
                if (BootstrapHandlers.findMetricsSupport(bootstrap) != null) {
                    PooledConnectionProviderMetrics.registerMetrics(this.name, poolKey.hashCode() + "", Metrics.formatSocketAddress(bootstrap.config().remoteAddress()), newPool.metrics());
                }
                return newPool;
            });
            PooledConnectionProvider.disposableAcquire(sink, obs, pool, opsFactory, this.acquireTimeout);
        });
    }

    @Override
    public Mono<Void> disposeLater() {
        return Mono.defer(() -> {
            ArrayList<Mono<Void>> pools = new ArrayList<Mono<Void>>();
            for (PoolKey key : this.channelPools.keySet()) {
                pools.add(((InstrumentedPool)this.channelPools.remove(key)).disposeLater());
            }
            if (pools.isEmpty()) {
                return Mono.empty();
            }
            return Mono.when(pools);
        });
    }

    @Override
    public boolean isDisposed() {
        return this.channelPools.isEmpty() || this.channelPools.values().stream().allMatch(Disposable::isDisposed);
    }

    @Override
    public int maxConnections() {
        return this.maxConnections;
    }

    public String toString() {
        return "PooledConnectionProvider {name=" + this.name + ", poolFactory=" + this.poolFactory + '}';
    }

    static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, InstrumentedPool<PooledConnection> pool, ChannelOperations.OnSetup opsFactory, long acquireTimeout) {
        DisposableAcquire disposableAcquire = new DisposableAcquire(sink, pool, obs, opsFactory, acquireTimeout);
        Mono mono = pool.acquire(Duration.ofMillis(acquireTimeout));
        mono.subscribe(disposableAcquire);
    }

    static final class PoolKey {
        final SocketAddress holder;
        final int pipelineKey;
        final String fqdn;

        PoolKey(SocketAddress holder, int pipelineKey) {
            this.holder = holder;
            this.fqdn = holder instanceof InetSocketAddress ? holder.toString() : "null";
            this.pipelineKey = pipelineKey;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PoolKey poolKey = (PoolKey)o;
            return this.pipelineKey == poolKey.pipelineKey && Objects.equals(this.holder, poolKey.holder) && Objects.equals(this.fqdn, poolKey.fqdn);
        }

        public int hashCode() {
            return Objects.hash(this.holder, this.pipelineKey, this.fqdn);
        }
    }

    static final class DisposableAcquire
    implements ConnectionObserver,
    Runnable,
    CoreSubscriber<PooledRef<PooledConnection>>,
    Disposable {
        final MonoSink<Connection> sink;
        final InstrumentedPool<PooledConnection> pool;
        final ConnectionObserver obs;
        final ChannelOperations.OnSetup opsFactory;
        final long acquireTimeout;
        PooledRef<PooledConnection> pooledRef;
        Subscription subscription;

        DisposableAcquire(MonoSink<Connection> sink, InstrumentedPool<PooledConnection> pool, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, long acquireTimeout) {
            this.pool = pool;
            this.sink = sink;
            this.obs = obs;
            this.opsFactory = opsFactory;
            this.acquireTimeout = acquireTimeout;
        }

        @Override
        public void onNext(PooledRef<PooledConnection> value) {
            this.pooledRef = value;
            PooledConnection pooledConnection = value.poolable();
            pooledConnection.pooledRef = this.pooledRef;
            Channel c = pooledConnection.channel;
            if (c.eventLoop().inEventLoop()) {
                this.run();
            } else {
                c.eventLoop().execute(this);
            }
        }

        @Override
        public void dispose() {
            this.subscription.cancel();
        }

        @Override
        public void onError(Throwable throwable) {
            this.sink.error(throwable);
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.subscription, s)) {
                this.subscription = s;
                this.sink.onCancel(this);
                s.request(Long.MAX_VALUE);
            }
        }

        @Override
        public void onComplete() {
        }

        @Override
        public Context currentContext() {
            return this.sink.currentContext();
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.sink.error(error);
            this.obs.onUncaughtException(connection, error);
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            if (newState == ConnectionObserver.State.CONFIGURED) {
                this.sink.success(connection);
            }
            this.obs.onStateChange(connection, newState);
        }

        @Override
        public void run() {
            PooledConnection pooledConnection = this.pooledRef.poolable();
            Channel c = pooledConnection.channel;
            ConnectionObserver current = c.attr(OWNER).getAndSet(this);
            if (current instanceof PendingConnectionObserver) {
                PendingConnectionObserver.Pending p;
                PendingConnectionObserver pending = (PendingConnectionObserver)current;
                current = null;
                while ((p = pending.pendingQueue.poll()) != null) {
                    if (p.error != null) {
                        this.onUncaughtException(p.connection, p.error);
                        continue;
                    }
                    if (p.state == null) continue;
                    this.onStateChange(p.connection, p.state);
                }
            }
            if (current != null) {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(c, "Channel acquired, now {} active connections and {} inactive connections"), this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize());
                }
                this.obs.onStateChange(pooledConnection, ConnectionObserver.State.ACQUIRED);
                ChannelOperations<?, ?> ops = this.opsFactory.create(pooledConnection, pooledConnection, null);
                if (ops != null) {
                    ops.bind();
                    this.obs.onStateChange(ops, ConnectionObserver.State.CONFIGURED);
                    this.sink.success(ops);
                } else {
                    this.sink.success(pooledConnection);
                }
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(c, "Channel connected, now {} active connections and {} inactive connections"), this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize());
            }
            if (this.opsFactory == ChannelOperations.OnSetup.empty()) {
                this.sink.success(Connection.from(c));
            }
        }
    }

    static final class PooledConnection
    implements Connection,
    ConnectionObserver {
        final Channel channel;
        final InstrumentedPool<PooledConnection> pool;
        final MonoProcessor<Void> onTerminate;
        PooledRef<PooledConnection> pooledRef;

        PooledConnection(Channel channel, InstrumentedPool<PooledConnection> pool) {
            this.channel = channel;
            this.pool = pool;
            this.onTerminate = MonoProcessor.create();
        }

        ConnectionObserver owner() {
            ConnectionObserver obs;
            do {
                if ((obs = this.channel.attr(OWNER).get()) != null) {
                    return obs;
                }
                obs = new PendingConnectionObserver();
            } while (!this.channel.attr(OWNER).compareAndSet(null, obs));
            return obs;
        }

        @Override
        public Mono<Void> onTerminate() {
            return this.onTerminate.or(this.onDispose());
        }

        @Override
        public Channel channel() {
            return this.channel;
        }

        @Override
        public Context currentContext() {
            return this.owner().currentContext();
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.owner().onUncaughtException(connection, error);
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(connection.channel(), "onStateChange({}, {})"), connection, newState);
            }
            if (newState == ConnectionObserver.State.DISCONNECTING) {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(connection.channel(), "Releasing channel"));
                }
                ConnectionObserver obs = this.channel.attr(OWNER).getAndSet(ConnectionObserver.emptyListener());
                if (this.pooledRef == null) {
                    return;
                }
                this.pooledRef.release().subscribe(null, t -> {
                    if (log.isDebugEnabled()) {
                        log.debug("Failed cleaning the channel from pool, now {} active connections and {} inactive connections", this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize(), t);
                    }
                    this.onTerminate.onComplete();
                    obs.onStateChange(connection, ConnectionObserver.State.RELEASED);
                }, () -> {
                    if (log.isDebugEnabled()) {
                        log.debug(ReactorNetty.format(this.pooledRef.poolable().channel, "Channel cleaned, now {} active connections and {} inactive connections"), this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize());
                    }
                    this.onTerminate.onComplete();
                    obs.onStateChange(connection, ConnectionObserver.State.RELEASED);
                });
                return;
            }
            this.owner().onStateChange(connection, newState);
        }

        public String toString() {
            return "PooledConnection{channel=" + this.channel + '}';
        }
    }

    static final class PendingConnectionObserver
    implements ConnectionObserver {
        final Queue<Pending> pendingQueue = Queues.unbounded(4).get();

        PendingConnectionObserver() {
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.pendingQueue.add(new Pending(connection, error, null));
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            this.pendingQueue.add(new Pending(connection, null, newState));
        }

        static class Pending {
            final Connection connection;
            final Throwable error;
            final ConnectionObserver.State state;

            Pending(Connection connection, @Nullable Throwable error, @Nullable ConnectionObserver.State state) {
                this.connection = connection;
                this.error = error;
                this.state = state;
            }
        }
    }

    static final class PooledConnectionAllocator {
        final InstrumentedPool<PooledConnection> pool;
        final Bootstrap bootstrap;
        final ChannelOperations.OnSetup opsFactory;

        PooledConnectionAllocator(Bootstrap b, PoolFactory provider, ChannelOperations.OnSetup opsFactory) {
            this.bootstrap = b.clone();
            this.opsFactory = opsFactory;
            this.pool = provider.newPool(this.connectChannel(), DESTROY_HANDLER, EVICTION_PREDICATE);
        }

        Publisher<PooledConnection> connectChannel() {
            return Mono.create(sink -> {
                Bootstrap b = this.bootstrap.clone();
                PooledConnectionInitializer initializer = new PooledConnectionInitializer((MonoSink<PooledConnection>)sink);
                b.handler(initializer);
                ChannelFuture f = b.connect();
                if (f.isDone()) {
                    initializer.operationComplete(f);
                } else {
                    f.addListener(initializer);
                }
            });
        }

        final class PooledConnectionInitializer
        implements ChannelHandler,
        ChannelFutureListener {
            final MonoSink<PooledConnection> sink;
            PooledConnection pooledConnection;

            PooledConnectionInitializer(MonoSink<PooledConnection> sink) {
                this.sink = sink;
            }

            @Override
            public void handlerAdded(ChannelHandlerContext ctx) {
                PooledConnection pooledConnection;
                ctx.pipeline().remove(this);
                Channel ch = ctx.channel();
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(ch, "Created new pooled channel, now {} active connections and {} inactive connections"), PooledConnectionAllocator.this.pool.metrics().acquiredSize(), PooledConnectionAllocator.this.pool.metrics().idleSize());
                }
                this.pooledConnection = pooledConnection = new PooledConnection(ch, PooledConnectionAllocator.this.pool);
                pooledConnection.bind();
                Bootstrap b = PooledConnectionAllocator.this.bootstrap.clone();
                BootstrapHandlers.finalizeHandler(b, PooledConnectionAllocator.this.opsFactory, (ConnectionObserver)pooledConnection);
                ch.pipeline().addFirst(b.config().handler());
            }

            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) {
            }

            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                ctx.pipeline().remove(this);
            }

            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    this.sink.success(this.pooledConnection);
                } else {
                    this.sink.error(future.cause());
                }
            }
        }
    }

    static interface PoolFactory {
        public InstrumentedPool<PooledConnection> newPool(Publisher<PooledConnection> var1, Function<PooledConnection, Publisher<Void>> var2, BiPredicate<PooledConnection, PooledRefMetadata> var3);
    }
}

