/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPMessage;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.GetFailoverLogRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogResponse;
import com.couchbase.client.core.message.dcp.StreamCloseRequest;
import com.couchbase.client.core.message.dcp.StreamCloseResponse;
import com.couchbase.client.core.message.dcp.StreamEndMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.message.kv.GetAllMutationTokensRequest;
import com.couchbase.client.core.message.kv.GetAllMutationTokensResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.utils.UnicastAutoReleaseSubject;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.util.Attribute;
import com.couchbase.client.deps.io.netty.util.AttributeKey;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

@Deprecated
@InterfaceStability.Experimental
@InterfaceAudience.Public
public class DCPConnection {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DCPConnection.class);
    private static final AttributeKey<Integer> CONSUMED_BYTES = AttributeKey.newInstance("CONSUMED_BYTES");
    private static final int MINIMUM_HEADER_SIZE = 24;
    private final SerializedSubject<DCPRequest, DCPRequest> subject;
    private final Set<Short> streams;
    private final ClusterFacade core;
    private final String bucket;
    private final String password;
    private final CoreEnvironment env;
    private final ConcurrentMap<Short, ChannelHandlerContext> contexts;

    public DCPConnection(CoreEnvironment env, ClusterFacade core, String bucket, String password) {
        this(env, core, bucket, password, (SerializedSubject<DCPRequest, DCPRequest>)UnicastAutoReleaseSubject.create(env.autoreleaseAfter(), TimeUnit.MILLISECONDS, env.scheduler()).withTraceIdentifier("DCPConnection." + env.dcpConnectionName()).toSerialized());
    }

    public DCPConnection(CoreEnvironment env, ClusterFacade core, String bucket, String password, SerializedSubject<DCPRequest, DCPRequest> subject) {
        this.env = env;
        this.core = core;
        this.subject = subject;
        this.bucket = bucket;
        this.password = password;
        this.streams = new ConcurrentSet<Short>();
        this.contexts = new ConcurrentHashMap<Short, ChannelHandlerContext>();
    }

    public String bucket() {
        return this.bucket;
    }

    public Subject<DCPRequest, DCPRequest> subject() {
        return this.subject;
    }

    public Observable<ResponseStatus> addStream(short partition) {
        return this.addStream(partition, 0L, 0L, -1L, 0L, 0L);
    }

    public Observable<ResponseStatus> addStream(final short partition, final long vbucketUUID, final long startSequenceNumber, final long endSequenceNumber, final long snapshotStartSequenceNumber, final long snapshotEndSequenceNumber) {
        if (this.streams.contains(partition)) {
            return Observable.just((Object)((Object)ResponseStatus.EXISTS));
        }
        final DCPConnection connection = this;
        return Observable.defer((Func0)new Func0<Observable<StreamRequestResponse>>(){

            public Observable<StreamRequestResponse> call() {
                return DCPConnection.this.core.send(new StreamRequestRequest(partition, vbucketUUID, startSequenceNumber, endSequenceNumber, snapshotStartSequenceNumber, snapshotEndSequenceNumber, DCPConnection.this.bucket, DCPConnection.this.password, connection));
            }
        }).flatMap((Func1)new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>(){

            public Observable<StreamRequestResponse> call(StreamRequestResponse response) {
                long rollbackSequenceNumber;
                switch (response.status()) {
                    case RANGE_ERROR: {
                        rollbackSequenceNumber = 0L;
                        break;
                    }
                    case ROLLBACK: {
                        rollbackSequenceNumber = response.rollbackToSequenceNumber();
                        break;
                    }
                    default: {
                        return Observable.just((Object)response);
                    }
                }
                return DCPConnection.this.core.send(new StreamRequestRequest(partition, vbucketUUID, rollbackSequenceNumber, endSequenceNumber, rollbackSequenceNumber, snapshotEndSequenceNumber, DCPConnection.this.bucket, DCPConnection.this.password, connection));
            }
        }).map((Func1)new Func1<StreamRequestResponse, ResponseStatus>(){

            public ResponseStatus call(StreamRequestResponse response) {
                if (response.status() == ResponseStatus.SUCCESS) {
                    DCPConnection.this.streams.add(partition);
                }
                return response.status();
            }
        });
    }

    public Observable<ResponseStatus> removeStream(final short partition) {
        if (!this.streams.contains(partition)) {
            return Observable.just((Object)((Object)ResponseStatus.NOT_EXISTS));
        }
        return Observable.defer((Func0)new Func0<Observable<StreamCloseResponse>>(){

            public Observable<StreamCloseResponse> call() {
                return DCPConnection.this.core.send(new StreamCloseRequest(partition, DCPConnection.this.bucket, DCPConnection.this.password));
            }
        }).map((Func1)new Func1<StreamCloseResponse, ResponseStatus>(){

            public ResponseStatus call(StreamCloseResponse response) {
                if (response.status() == ResponseStatus.SUCCESS) {
                    DCPConnection.this.streams.remove(partition);
                }
                return response.status();
            }
        });
    }

    public Observable<MutationToken> getCurrentState() {
        return this.core.send(new GetClusterConfigRequest()).flatMap((Func1)new Func1<GetClusterConfigResponse, Observable<NodeInfo>>(){

            public Observable<NodeInfo> call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig cfg = (CouchbaseBucketConfig)response.config().bucketConfig(DCPConnection.this.bucket);
                return Observable.from(cfg.nodes());
            }
        }).filter((Func1)new Func1<NodeInfo, Boolean>(){

            public Boolean call(NodeInfo node) {
                return node.services().containsKey((Object)ServiceType.DCP) || node.sslServices().containsKey((Object)ServiceType.DCP);
            }
        }).flatMap((Func1)new Func1<NodeInfo, Observable<GetAllMutationTokensResponse>>(){

            public Observable<GetAllMutationTokensResponse> call(NodeInfo node) {
                return DCPConnection.this.core.send(new GetAllMutationTokensRequest(node.hostname(), DCPConnection.this.bucket));
            }
        }).collect((Func0)new Func0<Map<Integer, MutationToken>>(){

            public Map<Integer, MutationToken> call() {
                return new HashMap<Integer, MutationToken>(1024);
            }
        }, (Action2)new Action2<Map<Integer, MutationToken>, GetAllMutationTokensResponse>(){

            public void call(Map<Integer, MutationToken> collector, GetAllMutationTokensResponse response) {
                for (MutationToken token : response.mutationTokens()) {
                    int key = (int)token.vbucketID();
                    MutationToken prev = collector.get(key);
                    MutationToken current = token;
                    if (prev != null && prev.sequenceNumber() != token.sequenceNumber()) {
                        if (current.sequenceNumber() < prev.sequenceNumber()) {
                            current = prev;
                        }
                        LOGGER.debug("nodes are not agree on sequence number for vbucket {}: old={}, new={}, selected={}", token.vbucketID(), prev.sequenceNumber(), token.sequenceNumber(), current.sequenceNumber());
                    }
                    collector.put(key, current);
                }
            }
        }).flatMap((Func1)new Func1<Map<Integer, MutationToken>, Observable<MutationToken>>(){

            public Observable<MutationToken> call(Map<Integer, MutationToken> sequenceNumbers) {
                return Observable.from(sequenceNumbers.values());
            }
        }).flatMap((Func1)new Func1<MutationToken, Observable<MutationToken>>(){

            public Observable<MutationToken> call(final MutationToken token) {
                return DCPConnection.this.core.send(new GetFailoverLogRequest((short)token.vbucketID(), DCPConnection.this.bucket)).map((Func1)new Func1<GetFailoverLogResponse, MutationToken>(){

                    public MutationToken call(GetFailoverLogResponse failoverLogsResponse) {
                        FailoverLogEntry entry = failoverLogsResponse.failoverLog().get(0);
                        return new MutationToken(failoverLogsResponse.partition(), entry.vbucketUUID(), token.sequenceNumber(), DCPConnection.this.bucket);
                    }
                });
            }
        });
    }

    public void consumed(DCPMessage event) {
        this.consumed(event.partition(), event.totalBodyLength());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void consumed(short partition, int delta) {
        if (this.env.dcpConnectionBufferSize() > 0) {
            ChannelHandlerContext ctx = (ChannelHandlerContext)this.contexts.get(partition);
            if (ctx == null) {
                return;
            }
            ChannelHandlerContext channelHandlerContext = ctx;
            synchronized (channelHandlerContext) {
                Attribute<Integer> attr = ctx.attr(CONSUMED_BYTES);
                Integer consumedBytes = attr.get();
                if (consumedBytes == null) {
                    consumedBytes = 0;
                }
                if ((double)(consumedBytes = Integer.valueOf(consumedBytes + (24 + delta))).intValue() >= (double)this.env.dcpConnectionBufferSize() * this.env.dcpConnectionBufferAckThreshold()) {
                    ctx.writeAndFlush(this.createBufferAcknowledgmentRequest(ctx, consumedBytes));
                    consumedBytes = 0;
                }
                attr.set(consumedBytes);
            }
        }
    }

    void streamClosed(short partition, StreamEndMessage.Reason reason) {
        this.streams.remove(partition);
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map((Func1)new Func1<GetClusterConfigResponse, Integer>(){

            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig config = (CouchbaseBucketConfig)response.config().bucketConfig(DCPConnection.this.bucket);
                return config.numberOfPartitions();
            }
        });
    }

    void registerContext(short partition, ChannelHandlerContext ctx) {
        this.contexts.put(partition, ctx);
    }

    private BinaryMemcacheRequest createBufferAcknowledgmentRequest(ChannelHandlerContext ctx, int bufferBytes) {
        ByteBuf extras = ctx.alloc().buffer(4).writeInt(bufferBytes);
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(new byte[0], extras);
        request.setOpcode((byte)93);
        request.setExtrasLength((byte)extras.readableBytes());
        request.setTotalBodyLength(extras.readableBytes());
        return request;
    }
}

