package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.ConfigPipeline;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/HttpStreamingConfigProvider.class */
public class HttpStreamingConfigProvider extends AbstractStateMachine<LifecycleState> implements ConfigProvider {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(HttpStreamingConfigProvider.class);
    private final AtomicReference<List<HostAndPort>> remoteHosts;
    private final Subject<DcpBucketConfig, DcpBucketConfig> configStream;
    private final AtomicLong currentBucketConfigRev;
    private volatile boolean stopped;
    private volatile Channel channel;
    private final ClientEnvironment env;
    private final MetricsContext metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider$3, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/dcp/conductor/HttpStreamingConfigProvider$3.class */
    public class AnonymousClass3 implements Completable.OnSubscribe {
        final /* synthetic */ Bootstrap val$bootstrap;
        final /* synthetic */ String val$remote;

        AnonymousClass3(Bootstrap bootstrap, String str) {
            this.val$bootstrap = bootstrap;
            this.val$remote = str;
        }

        public void call(final CompletableSubscriber completableSubscriber) {
            this.val$bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider.3.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    HttpStreamingConfigProvider.this.metrics.newActionCounter("connect").tag("remote", AnonymousClass3.this.val$remote).build().track(channelFuture);
                    if (!channelFuture.isSuccess()) {
                        completableSubscriber.onError(channelFuture.cause());
                        return;
                    }
                    HttpStreamingConfigProvider.this.channel = channelFuture.channel();
                    HttpStreamingConfigProvider.this.channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider.3.1.1
                        public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                            HttpStreamingConfigProvider.this.metrics.newEventCounter("channel.closed").tag("remote", AnonymousClass3.this.val$remote).build().increment();
                            HttpStreamingConfigProvider.this.transitionState(LifecycleState.DISCONNECTED);
                            HttpStreamingConfigProvider.this.channel = null;
                            HttpStreamingConfigProvider.this.triggerReconnect();
                        }
                    });
                    HttpStreamingConfigProvider.LOGGER.debug("Successfully established config connection to Socket {}", HttpStreamingConfigProvider.this.channel.remoteAddress());
                    HttpStreamingConfigProvider.this.transitionState(LifecycleState.CONNECTED);
                    completableSubscriber.onCompleted();
                }
            });
        }
    }

    public HttpStreamingConfigProvider(final ClientEnvironment clientEnvironment) {
        super(LifecycleState.DISCONNECTED);
        this.currentBucketConfigRev = new AtomicLong(-1L);
        this.stopped = false;
        this.metrics = new MetricsContext("dcp.config");
        this.env = clientEnvironment;
        this.remoteHosts = new AtomicReference<>(clientEnvironment.clusterAt());
        this.configStream = BehaviorSubject.create().toSerialized();
        this.configStream.subscribe(new Subscriber<DcpBucketConfig>() { // from class: com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider.1
            public void onCompleted() {
                HttpStreamingConfigProvider.LOGGER.debug("Config stream completed.");
            }

            public void onError(Throwable th) {
                HttpStreamingConfigProvider.LOGGER.warn("Error on config stream!", th);
            }

            public void onNext(DcpBucketConfig dcpBucketConfig) {
                ArrayList arrayList = new ArrayList();
                for (NodeInfo nodeInfo : dcpBucketConfig.nodes()) {
                    arrayList.add(new HostAndPort(nodeInfo.hostname(), ((Integer) (clientEnvironment.sslEnabled() ? nodeInfo.sslServices() : nodeInfo.services()).get(ServiceType.CONFIG)).intValue()));
                }
                HttpStreamingConfigProvider.LOGGER.trace("Updated config stream node list to {}.", arrayList);
                HttpStreamingConfigProvider.this.remoteHosts.set(arrayList);
            }
        });
    }

    @Override // com.couchbase.client.dcp.conductor.ConfigProvider
    public Completable start() {
        return tryConnectHosts();
    }

    @Override // com.couchbase.client.dcp.conductor.ConfigProvider
    public Completable stop() {
        this.stopped = true;
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider.2
            public void call(final CompletableSubscriber completableSubscriber) {
                HttpStreamingConfigProvider.LOGGER.debug("Initiating streaming config provider shutdown on channel.");
                HttpStreamingConfigProvider.this.transitionState(LifecycleState.DISCONNECTING);
                if (HttpStreamingConfigProvider.this.channel == null) {
                    completableSubscriber.onCompleted();
                    return;
                }
                Channel channel = HttpStreamingConfigProvider.this.channel;
                HttpStreamingConfigProvider.this.channel = null;
                channel.close().addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider.2.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        HttpStreamingConfigProvider.this.transitionState(LifecycleState.DISCONNECTED);
                        if (channelFuture.isSuccess()) {
                            HttpStreamingConfigProvider.LOGGER.debug("Streaming config provider channel shutdown completed.");
                            completableSubscriber.onCompleted();
                        } else {
                            HttpStreamingConfigProvider.LOGGER.warn("Error during streaming config provider shutdown!", channelFuture.cause());
                            completableSubscriber.onError(channelFuture.cause());
                        }
                    }
                });
            }
        });
    }

    @Override // com.couchbase.client.dcp.conductor.ConfigProvider
    public Observable<DcpBucketConfig> configs() {
        return this.configStream;
    }

    private Completable tryConnectHosts() {
        if (this.stopped) {
            LOGGER.debug("Not trying to connect to hosts, already stopped.");
            return Completable.complete();
        }
        transitionState(LifecycleState.CONNECTING);
        List<HostAndPort> list = this.remoteHosts.get();
        Completable tryConnectHost = tryConnectHost(list.get(0));
        for (int i = 1; i < list.size(); i++) {
            HostAndPort hostAndPort = list.get(i);
            tryConnectHost = tryConnectHost.onErrorResumeNext(th -> {
                LOGGER.warn("Could not get config from Node, trying next in list.", th);
                return tryConnectHost(hostAndPort);
            });
        }
        return tryConnectHost;
    }

    private Completable tryConnectHost(HostAndPort hostAndPort) {
        PooledByteBufAllocator pooledByteBufAllocator = this.env.poolBuffers() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
        InetSocketAddress address = hostAndPort.toAddress();
        return Completable.create(new AnonymousClass3(new Bootstrap().remoteAddress(address).option(ChannelOption.ALLOCATOR, pooledByteBufAllocator).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) this.env.socketConnectTimeout())).channel(ChannelUtils.channelForEventLoopGroup(this.env.eventLoopGroup())).handler(new ConfigPipeline(this.env, address, this.configStream, this.currentBucketConfigRev)).group(this.env.eventLoopGroup()), hostAndPort.host() + ":" + hostAndPort.port()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void triggerReconnect() {
        transitionState(LifecycleState.CONNECTING);
        if (this.stopped) {
            return;
        }
        tryConnectHosts().retryWhen(RetryBuilder.any().delay(this.env.configProviderReconnectDelay()).max(this.env.configProviderReconnectMaxAttempts()).doOnRetry((num, th, l, timeUnit) -> {
            LOGGER.info("No host usable to fetch a config from, waiting and retrying (remote hosts: {}).", RedactableArgument.system(this.remoteHosts.get()));
        }).build()).subscribe();
    }
}
