package com.couchbase.client.dcp;

import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.event.EventBus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.core.utils.ConnectionString;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.error.BootstrapException;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.util.MathUtils;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.nio.NioEventLoopGroup;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;

/* loaded from: input_file:com/couchbase/client/dcp/Client.class */
public class Client {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Client.class);
    private final Conductor conductor;
    private final ClientEnvironment env;
    private final boolean bufferAckEnabled;

    /* loaded from: input_file:com/couchbase/client/dcp/Client$Builder.class */
    public static class Builder {
        private EventLoopGroup eventLoopGroup;
        private int bufferAckWatermark;
        private EventBus eventBus;
        private String sslKeystoreFile;
        private String sslKeystorePassword;
        private KeyStore sslKeystore;
        private long persistencePollingIntervalMillis;
        private List<HostAndPort> clusterAt = Collections.singletonList(new HostAndPort("127.0.0.1", 0));
        private NetworkResolution networkResolution = NetworkResolution.AUTO;
        private String bucket = "default";
        private CredentialsProvider credentialsProvider = new StaticCredentialsProvider("", "");
        private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
        private DcpControl dcpControl = new DcpControl();
        private ConfigProvider configProvider = null;
        private boolean poolBuffers = true;
        private long connectTimeout = ClientEnvironment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private long bootstrapTimeout = ClientEnvironment.DEFAULT_BOOTSTRAP_TIMEOUT;
        private long socketConnectTimeout = ClientEnvironment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private Delay configProviderReconnectDelay = ClientEnvironment.DEFAULT_CONFIG_PROVIDER_RECONNECT_DELAY;
        private int configProviderReconnectMaxAttempts = Integer.MAX_VALUE;
        private int dcpChannelsReconnectMaxAttempts = Integer.MAX_VALUE;
        private Delay dcpChannelsReconnectDelay = ClientEnvironment.DEFAULT_DCP_CHANNELS_RECONNECT_DELAY;
        private boolean sslEnabled = false;

        public Builder bufferAckWatermark(int i) {
            if (i > 100 || i < 0) {
                throw new IllegalArgumentException("The bufferAckWatermark is percents, so it needs to be between 0 and 100");
            }
            this.bufferAckWatermark = i;
            return this;
        }

        public Builder hostnames(List<String> list) {
            this.clusterAt = getSeedNodes(ConnectionString.fromHostnames(list));
            return this;
        }

        public Builder hostnames(String... strArr) {
            return hostnames(Arrays.asList(strArr));
        }

        public Builder connectionString(String str) {
            this.clusterAt = getSeedNodes(ConnectionString.create(str));
            return this;
        }

        private static List<HostAndPort> getSeedNodes(ConnectionString connectionString) {
            return (List) connectionString.hosts().stream().map(unresolvedSocket -> {
                return new HostAndPort(unresolvedSocket.hostname(), unresolvedSocket.port());
            }).collect(Collectors.toList());
        }

        public Builder networkResolution(NetworkResolution networkResolution) {
            this.networkResolution = (NetworkResolution) Objects.requireNonNull(networkResolution);
            return this;
        }

        public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public Builder bucket(String str) {
            this.bucket = str;
            if (this.credentialsProvider.get(null).getUsername().isEmpty()) {
                username(str);
            }
            return this;
        }

        public Builder username(String str) {
            this.credentialsProvider = new StaticCredentialsProvider(str, this.credentialsProvider.get(null).getPassword());
            return this;
        }

        public Builder credentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = credentialsProvider;
            return this;
        }

        public Builder password(String str) {
            this.credentialsProvider = new StaticCredentialsProvider(this.credentialsProvider.get(null).getUsername(), str);
            return this;
        }

        public Builder connectionNameGenerator(ConnectionNameGenerator connectionNameGenerator) {
            this.connectionNameGenerator = connectionNameGenerator;
            return this;
        }

        public Builder controlParam(DcpControl.Names names, Object obj) {
            this.dcpControl.put(names, obj.toString());
            return this;
        }

        public Builder compression(CompressionMode compressionMode) {
            this.dcpControl.compression(compressionMode);
            return this;
        }

        public Builder configProvider(ConfigProvider configProvider) {
            this.configProvider = configProvider;
            return this;
        }

        public Builder poolBuffers(boolean z) {
            this.poolBuffers = z;
            return this;
        }

        public Builder socketConnectTimeout(long j) {
            this.socketConnectTimeout = j;
            return this;
        }

        public Builder bootstrapTimeout(long j) {
            this.bootstrapTimeout = j;
            return this;
        }

        public Builder connectTimeout(long j) {
            this.connectTimeout = j;
            return this;
        }

        public Builder configProviderReconnectDelay(Delay delay) {
            this.configProviderReconnectDelay = delay;
            return this;
        }

        public Builder configProviderReconnectMaxAttempts(int i) {
            this.configProviderReconnectMaxAttempts = i;
            return this;
        }

        public Builder dcpChannelsReconnectMaxAttempts(int i) {
            this.dcpChannelsReconnectMaxAttempts = i;
            return this;
        }

        public Builder dcpChannelsReconnectDelay(Delay delay) {
            this.dcpChannelsReconnectDelay = delay;
            return this;
        }

        public Builder eventBus(EventBus eventBus) {
            this.eventBus = eventBus;
            return this;
        }

        public Builder sslEnabled(boolean z) {
            this.sslEnabled = z;
            return this;
        }

        public Builder sslKeystoreFile(String str) {
            this.sslKeystoreFile = str;
            return this;
        }

        public Builder sslKeystorePassword(String str) {
            this.sslKeystorePassword = str;
            return this;
        }

        public Builder sslKeystore(KeyStore keyStore) {
            this.sslKeystore = keyStore;
            return this;
        }

        public Builder mitigateRollbacks(long j, TimeUnit timeUnit) {
            this.persistencePollingIntervalMillis = timeUnit.toMillis(j);
            return this;
        }

        public Builder flowControl(int i) {
            controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, Integer.valueOf(i));
            if (this.bufferAckWatermark == 0) {
                this.bufferAckWatermark = 80;
            }
            return this;
        }

        public Client build() {
            return new Client(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/client/dcp/Client$PartitionAndSeqno.class */
    public static class PartitionAndSeqno {
        private final short partition;
        private final long seqno;

        public PartitionAndSeqno(short s, long j) {
            this.partition = s;
            this.seqno = j;
        }

        public short partition() {
            return this.partition;
        }

        public long seqno() {
            return this.seqno;
        }
    }

    private Client(Builder builder) {
        this.env = ClientEnvironment.builder().setClusterAt(builder.clusterAt).setNetworkResolution(builder.networkResolution).setConnectionNameGenerator(builder.connectionNameGenerator).setBucket(builder.bucket).setCredentialsProvider(builder.credentialsProvider).setDcpControl(builder.dcpControl).setEventLoopGroup(builder.eventLoopGroup == null ? new NioEventLoopGroup() : builder.eventLoopGroup, builder.eventLoopGroup == null).setBufferAckWatermark(builder.bufferAckWatermark).setBufferPooling(builder.poolBuffers).setConnectTimeout(builder.connectTimeout).setBootstrapTimeout(builder.bootstrapTimeout).setSocketConnectTimeout(builder.socketConnectTimeout).setConfigProviderReconnectDelay(builder.configProviderReconnectDelay).setConfigProviderReconnectMaxAttempts(builder.configProviderReconnectMaxAttempts).setDcpChannelsReconnectDelay(builder.dcpChannelsReconnectDelay).setDcpChannelsReconnectMaxAttempts(builder.dcpChannelsReconnectMaxAttempts).setEventBus(builder.eventBus).setSslEnabled(builder.sslEnabled).setSslKeystoreFile(builder.sslKeystoreFile).setSslKeystorePassword(builder.sslKeystorePassword).setSslKeystore(builder.sslKeystore).setPersistencePollingIntervalMillis(builder.persistencePollingIntervalMillis).build();
        this.bufferAckEnabled = this.env.dcpControl().bufferAckEnabled();
        if (this.bufferAckEnabled && this.env.bufferAckWatermark() == 0) {
            throw new IllegalArgumentException("The bufferAckWatermark needs to be set if bufferAck is enabled.");
        }
        controlEventHandler((channelFlowController, byteBuf) -> {
            try {
                if (DcpSnapshotMarkerRequest.is(byteBuf)) {
                    channelFlowController.ack(byteBuf);
                }
            } finally {
                byteBuf.release();
            }
        });
        dataEventHandler((channelFlowController2, byteBuf2) -> {
            try {
                channelFlowController2.ack(byteBuf2);
            } finally {
                byteBuf2.release();
            }
        });
        this.conductor = new Conductor(this.env, builder.configProvider, new DcpClientMetrics(new MetricsContext("dcp")));
        LOGGER.info("Environment Configuration Used: {}", RedactableArgument.system(this.env));
    }

    public static Builder configure() {
        return new Builder();
    }

    private Observable<PartitionAndSeqno> getSeqnos() {
        return this.conductor.getSeqnos().flatMap(new Func1<ByteBuf, Observable<PartitionAndSeqno>>() { // from class: com.couchbase.client.dcp.Client.1
            public Observable<PartitionAndSeqno> call(ByteBuf byteBuf) {
                int readableBytes = byteBuf.readableBytes() / 10;
                ArrayList arrayList = new ArrayList(readableBytes);
                for (int i = 0; i < readableBytes; i++) {
                    arrayList.add(new PartitionAndSeqno(byteBuf.getShort(10 * i), byteBuf.getLong((10 * i) + 2)));
                }
                byteBuf.release();
                return Observable.from(arrayList);
            }
        });
    }

    public SessionState sessionState() {
        return this.conductor.sessionState();
    }

    public void controlEventHandler(final ControlEventHandler controlEventHandler) {
        this.env.setControlEventHandler(new ControlEventHandler() { // from class: com.couchbase.client.dcp.Client.2
            @Override // com.couchbase.client.dcp.ControlEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                if (DcpSnapshotMarkerRequest.is(byteBuf)) {
                    short partition = DcpSnapshotMarkerRequest.partition(byteBuf);
                    PartitionState partitionState = Client.this.sessionState().get(partition);
                    partitionState.setSnapshotStartSeqno(DcpSnapshotMarkerRequest.startSeqno(byteBuf));
                    partitionState.setSnapshotEndSeqno(DcpSnapshotMarkerRequest.endSeqno(byteBuf));
                    Client.this.sessionState().set(partition, partitionState);
                } else if (DcpFailoverLogResponse.is(byteBuf)) {
                    Client.this.handleFailoverLogResponse(byteBuf);
                    byteBuf.release();
                    return;
                } else if (RollbackMessage.is(byteBuf)) {
                    Client.LOGGER.warn("Received rollback for vbucket {} to seqno {}", Short.valueOf(RollbackMessage.vbucket(byteBuf)), Long.valueOf(RollbackMessage.seqno(byteBuf)));
                }
                controlEventHandler.onEvent(channelFlowController, byteBuf);
            }
        });
    }

    public void systemEventHandler(SystemEventHandler systemEventHandler) {
        this.env.setSystemEventHandler(systemEventHandler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFailoverLogResponse(ByteBuf byteBuf) {
        short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
        PartitionState partitionState = sessionState().get(vbucket);
        partitionState.setFailoverLog(DcpFailoverLogResponse.entries(byteBuf));
        sessionState().set(vbucket, partitionState);
    }

    public void dataEventHandler(final DataEventHandler dataEventHandler) {
        this.env.setDataEventHandler(new DataEventHandler() { // from class: com.couchbase.client.dcp.Client.3
            @Override // com.couchbase.client.dcp.DataEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                if (DcpMutationMessage.is(byteBuf)) {
                    short partition = DcpMutationMessage.partition(byteBuf);
                    PartitionState partitionState = Client.this.sessionState().get(partition);
                    partitionState.setStartSeqno(DcpMutationMessage.bySeqno(byteBuf));
                    Client.this.sessionState().set(partition, partitionState);
                } else if (DcpDeletionMessage.is(byteBuf)) {
                    short partition2 = DcpDeletionMessage.partition(byteBuf);
                    PartitionState partitionState2 = Client.this.sessionState().get(partition2);
                    partitionState2.setStartSeqno(DcpDeletionMessage.bySeqno(byteBuf));
                    Client.this.sessionState().set(partition2, partitionState2);
                } else if (DcpExpirationMessage.is(byteBuf)) {
                    short partition3 = DcpExpirationMessage.partition(byteBuf);
                    PartitionState partitionState3 = Client.this.sessionState().get(partition3);
                    partitionState3.setStartSeqno(DcpExpirationMessage.bySeqno(byteBuf));
                    Client.this.sessionState().set(partition3, partitionState3);
                }
                dataEventHandler.onEvent(channelFlowController, byteBuf);
            }
        });
    }

    public Completable connect() {
        if (!this.conductor.disconnected()) {
            LOGGER.debug("Ignoring duplicate connect attempt, already connecting/connected.");
            return Completable.complete();
        }
        if (this.env.dataEventHandler() == null) {
            throw new IllegalArgumentException("A DataEventHandler needs to be provided!");
        }
        if (this.env.controlEventHandler() == null) {
            throw new IllegalArgumentException("A ControlEventHandler needs to be provided!");
        }
        LOGGER.info("Connecting to seed nodes and bootstrapping bucket {}.", RedactableArgument.meta(this.env.bucket()));
        return this.conductor.connect().onErrorResumeNext(new Func1<Throwable, Completable>() { // from class: com.couchbase.client.dcp.Client.4
            public Completable call(Throwable th) {
                return Client.this.conductor.stop().andThen(Completable.error(new BootstrapException("Could not connect to Cluster/Bucket", th)));
            }
        });
    }

    public Completable disconnect() {
        return this.conductor.stop().andThen(this.env.shutdown());
    }

    public Completable startStreaming(Short... shArr) {
        int numPartitions = numPartitions();
        List<Short> selectInitializedPartitions = selectInitializedPartitions(numPartitions, partitionsForVbids(numPartitions, shArr));
        if (selectInitializedPartitions.isEmpty()) {
            LOGGER.info("The configured session state does not require any streams to be opened. Completing immediately.");
            return Completable.complete();
        }
        LOGGER.info("Starting to Stream for " + selectInitializedPartitions.size() + " partitions");
        LOGGER.debug("Stream start against partitions: {}", selectInitializedPartitions);
        return Observable.from(selectInitializedPartitions).flatMapCompletable(new Func1<Short, Completable>() { // from class: com.couchbase.client.dcp.Client.5
            public Completable call(Short sh) {
                PartitionState partitionState = Client.this.sessionState().get(sh.shortValue());
                return Client.this.conductor.startStreamForPartition(sh.shortValue(), partitionState.getLastUuid(), partitionState.getStartSeqno(), partitionState.getEndSeqno(), partitionState.getSnapshotStartSeqno(), partitionState.getSnapshotEndSeqno()).onErrorResumeNext(new Func1<Throwable, Completable>() { // from class: com.couchbase.client.dcp.Client.5.1
                    public Completable call(Throwable th) {
                        return th instanceof RollbackException ? Completable.complete() : Completable.error(th);
                    }
                });
            }
        }).toCompletable();
    }

    private List<Short> selectInitializedPartitions(int i, List<Short> list) {
        ArrayList arrayList = new ArrayList();
        SessionState sessionState = sessionState();
        Iterator<Short> it = list.iterator();
        while (it.hasNext()) {
            short shortValue = it.next().shortValue();
            PartitionState partitionState = sessionState.get(shortValue);
            if (partitionState == null) {
                LOGGER.debug("Skipping partition {}, because its state is null", Short.valueOf(shortValue));
            } else if (MathUtils.lessThanUnsigned(partitionState.getStartSeqno(), partitionState.getEndSeqno())) {
                arrayList.add(Short.valueOf(shortValue));
            } else {
                LOGGER.debug("Skipping partition {}, because startSeqno({}) >= endSeqno({})", new Object[]{Short.valueOf(shortValue), Long.valueOf(partitionState.getStartSeqno()), Long.valueOf(partitionState.getEndSeqno())});
            }
        }
        if (arrayList.size() > i) {
            throw new IllegalStateException("Session State has " + arrayList + " partitions while the cluster has " + i + "!");
        }
        return arrayList;
    }

    public Completable stopStreaming(Short... shArr) {
        List<Short> partitionsForVbids = partitionsForVbids(numPartitions(), shArr);
        LOGGER.info("Stopping to Stream for " + partitionsForVbids.size() + " partitions");
        LOGGER.debug("Stream stop against partitions: {}", partitionsForVbids);
        return Observable.from(partitionsForVbids).flatMapCompletable(new Func1<Short, Completable>() { // from class: com.couchbase.client.dcp.Client.6
            public Completable call(Short sh) {
                return Client.this.conductor.stopStreamForPartition(sh.shortValue());
            }
        }).toCompletable();
    }

    private static List<Short> partitionsForVbids(int i, Short... shArr) {
        if (shArr.length > 0) {
            Arrays.sort(shArr);
            return Arrays.asList(shArr);
        }
        ArrayList arrayList = new ArrayList(shArr.length);
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= i) {
                return arrayList;
            }
            arrayList.add(Short.valueOf(s2));
            s = (short) (s2 + 1);
        }
    }

    public Observable<ByteBuf> failoverLogs(Short... shArr) {
        List<Short> partitionsForVbids = partitionsForVbids(numPartitions(), shArr);
        LOGGER.debug("Asking for failover logs on partitions {}", partitionsForVbids);
        return Observable.from(partitionsForVbids).flatMapSingle(new Func1<Short, Single<ByteBuf>>() { // from class: com.couchbase.client.dcp.Client.7
            public Single<ByteBuf> call(Short sh) {
                return Client.this.conductor.getFailoverLog(sh.shortValue());
            }
        });
    }

    public Completable rollbackAndRestartStream(final short s, final long j) {
        return stopStreaming(Short.valueOf(s)).andThen(Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.Client.8
            public void call(CompletableSubscriber completableSubscriber) {
                Client.this.sessionState().rollbackToPosition(s, j);
                completableSubscriber.onCompleted();
            }
        })).andThen(startStreaming(Short.valueOf(s)));
    }

    public int numPartitions() {
        return this.conductor.numberOfPartitions();
    }

    public boolean streamIsOpen(short s) {
        return this.conductor.streamIsOpen(s);
    }

    public Completable initializeState(StreamFrom streamFrom, StreamTo streamTo) {
        if (streamFrom == StreamFrom.BEGINNING && streamTo == StreamTo.INFINITY) {
            buzzMe();
            return initFromBeginningToInfinity();
        }
        if (streamFrom == StreamFrom.BEGINNING && streamTo == StreamTo.NOW) {
            return initFromBeginningToNow();
        }
        if (streamFrom != StreamFrom.NOW || streamTo != StreamTo.INFINITY) {
            throw new IllegalStateException("Unsupported FROM/TO combination: " + streamFrom + " -> " + streamTo);
        }
        buzzMe();
        return initFromNowToInfinity();
    }

    public Completable recoverState(final StateFormat stateFormat, final byte[] bArr) {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.Client.9
            public void call(CompletableSubscriber completableSubscriber) {
                Client.LOGGER.info("Recovering state from format {}", stateFormat);
                Client.LOGGER.debug("PersistedState on recovery is: {}", new String(bArr, CharsetUtil.UTF_8));
                try {
                    if (stateFormat == StateFormat.JSON) {
                        Client.this.sessionState().setFromJson(bArr);
                        completableSubscriber.onCompleted();
                    } else {
                        completableSubscriber.onError(new IllegalStateException("Unsupported StateFormat " + stateFormat));
                    }
                } catch (Exception e) {
                    completableSubscriber.onError(e);
                }
            }
        });
    }

    public Completable recoverOrInitializeState(StateFormat stateFormat, byte[] bArr, StreamFrom streamFrom, StreamTo streamTo) {
        return (bArr == null || bArr.length == 0) ? initializeState(streamFrom, streamTo) : recoverState(stateFormat, bArr);
    }

    private Completable initFromBeginningToInfinity() {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.Client.10
            public void call(CompletableSubscriber completableSubscriber) {
                Client.LOGGER.info("Initializing state from beginning to no end.");
                try {
                    Client.this.sessionState().setToBeginningWithNoEnd(Client.this.numPartitions());
                    completableSubscriber.onCompleted();
                } catch (Exception e) {
                    Client.LOGGER.warn("Failed to initialize state from beginning to no end.", e);
                    completableSubscriber.onError(e);
                }
            }
        });
    }

    private Completable initFromNowToInfinity() {
        return initWithCallback(new Action1<PartitionAndSeqno>() { // from class: com.couchbase.client.dcp.Client.11
            public void call(PartitionAndSeqno partitionAndSeqno) {
                short partition = partitionAndSeqno.partition();
                long seqno = partitionAndSeqno.seqno();
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setStartSeqno(seqno);
                partitionState.setSnapshotStartSeqno(seqno);
                partitionState.setSnapshotEndSeqno(seqno);
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initFromBeginningToNow() {
        return initWithCallback(new Action1<PartitionAndSeqno>() { // from class: com.couchbase.client.dcp.Client.12
            public void call(PartitionAndSeqno partitionAndSeqno) {
                short partition = partitionAndSeqno.partition();
                long seqno = partitionAndSeqno.seqno();
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setEndSeqno(seqno);
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initWithCallback(Action1<PartitionAndSeqno> action1) {
        sessionState().setToBeginningWithNoEnd(numPartitions());
        return getSeqnos().doOnNext(action1).reduce(new ArrayList(), new Func2<List<Short>, PartitionAndSeqno, List<Short>>() { // from class: com.couchbase.client.dcp.Client.15
            public List<Short> call(List<Short> list, PartitionAndSeqno partitionAndSeqno) {
                list.add(Short.valueOf(partitionAndSeqno.partition()));
                return list;
            }
        }).flatMap(new Func1<List<Short>, Observable<ByteBuf>>() { // from class: com.couchbase.client.dcp.Client.14
            public Observable<ByteBuf> call(List<Short> list) {
                return Client.this.failoverLogs((Short[]) list.toArray(new Short[0]));
            }
        }).map(new Func1<ByteBuf, Short>() { // from class: com.couchbase.client.dcp.Client.13
            public Short call(ByteBuf byteBuf) {
                short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
                Client.this.handleFailoverLogResponse(byteBuf);
                byteBuf.release();
                return Short.valueOf(vbucket);
            }
        }).last().toCompletable();
    }

    private static void buzzMe() {
        LOGGER.debug("To Infinity... AND BEYOND!");
    }
}
