/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
import com.couchbase.client.core.dcp.BucketStreamState;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import rx.Observable;
import rx.functions.Func1;

@InterfaceStability.Experimental
@InterfaceAudience.Public
public class BucketStreamAggregator {
    private final ClusterFacade core;
    private final String bucket;

    public BucketStreamAggregator(ClusterFacade core, String bucket) {
        this.core = core;
        this.bucket = bucket;
    }

    public Observable<DCPRequest> feed() {
        return this.feed(new BucketStreamAggregatorState("jvmCore"));
    }

    public Observable<DCPRequest> feed(BucketStreamAggregatorState aggregatorState) {
        return this.open(aggregatorState).flatMap((Func1)new Func1<StreamRequestResponse, Observable<DCPRequest>>(){

            public Observable<DCPRequest> call(StreamRequestResponse response) {
                return response.stream();
            }
        });
    }

    public Observable<StreamRequestResponse> open(final BucketStreamAggregatorState aggregatorState) {
        return this.core.send(new OpenConnectionRequest(aggregatorState.name(), this.bucket)).flatMap((Func1)new Func1<OpenConnectionResponse, Observable<StreamRequestResponse>>(){

            public Observable<StreamRequestResponse> call(OpenConnectionResponse reponse) {
                return Observable.from((Iterable)aggregatorState).flatMap((Func1)new Func1<BucketStreamState, Observable<StreamRequestResponse>>(){

                    public Observable<StreamRequestResponse> call(final BucketStreamState feed) {
                        Observable res = BucketStreamAggregator.this.core.send(new StreamRequestRequest(feed.partition(), feed.vbucketUUID(), feed.startSequenceNumber(), feed.endSequenceNumber(), feed.snapshotStartSequenceNumber(), feed.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                        return res.flatMap((Func1)new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>(){

                            public Observable<StreamRequestResponse> call(StreamRequestResponse response) {
                                long rollbackSequenceNumber;
                                switch (response.status()) {
                                    case RANGE_ERROR: {
                                        rollbackSequenceNumber = 0L;
                                        break;
                                    }
                                    case ROLLBACK: {
                                        rollbackSequenceNumber = response.getRollbackToSequenceNumber();
                                        break;
                                    }
                                    default: {
                                        return Observable.just((Object)response);
                                    }
                                }
                                return BucketStreamAggregator.this.core.send(new StreamRequestRequest(feed.partition(), feed.vbucketUUID(), rollbackSequenceNumber, feed.endSequenceNumber(), feed.snapshotStartSequenceNumber(), feed.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                            }
                        });
                    }
                });
            }
        });
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map((Func1)new Func1<GetClusterConfigResponse, Integer>(){

            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig config = (CouchbaseBucketConfig)response.config().bucketConfig(BucketStreamAggregator.this.bucket);
                return config.numberOfPartitions();
            }
        });
    }
}

