/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.spark.streaming;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.spark.connection.CouchbaseConfig;
import com.couchbase.spark.connection.CouchbaseConnection$;
import com.couchbase.spark.streaming.Deletion;
import com.couchbase.spark.streaming.Mutation;
import com.couchbase.spark.streaming.Snapshot;
import com.couchbase.spark.streaming.StreamMessage;
import org.apache.spark.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import rx.lang.scala.JavaConversions$;
import rx.lang.scala.Observable;
import rx.lang.scala.Observable$;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001]4A!\u0001\u0002\u0001\u0017\t\t2i\\;dQ\n\f7/\u001a*fG\u0016Lg/\u001a:\u000b\u0005\r!\u0011!C:ue\u0016\fW.\u001b8h\u0015\t)a!A\u0003ta\u0006\u00148N\u0003\u0002\b\u0011\u0005I1m\\;dQ\n\f7/\u001a\u0006\u0002\u0013\u0005\u00191m\\7\u0004\u0001M\u0019\u0001\u0001\u0004\u000f\u0011\u000751\u0002$D\u0001\u000f\u0015\ty\u0001#\u0001\u0005sK\u000e,\u0017N^3s\u0015\t\u0019\u0011C\u0003\u0002\u0006%)\u00111\u0003F\u0001\u0007CB\f7\r[3\u000b\u0003U\t1a\u001c:h\u0013\t9bB\u0001\u0005SK\u000e,\u0017N^3s!\tI\"$D\u0001\u0003\u0013\tY\"AA\u0007TiJ,\u0017-\\'fgN\fw-\u001a\t\u0003;yi\u0011!E\u0005\u0003?E\u0011q\u0001T8hO&tw\r\u0003\u0005\"\u0001\t\u0005\t\u0015!\u0003#\u0003\u0019\u0019wN\u001c4jOB\u00111EJ\u0007\u0002I)\u0011Q\u0005B\u0001\u000bG>tg.Z2uS>t\u0017BA\u0014%\u0005=\u0019u.^2iE\u0006\u001cXmQ8oM&<\u0007\u0002C\u0015\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0016\u0002\u0015\t,8m[3u\u001d\u0006lW\r\u0005\u0002,c9\u0011AfL\u0007\u0002[)\ta&A\u0003tG\u0006d\u0017-\u0003\u00021[\u00051\u0001K]3eK\u001aL!AM\u001a\u0003\rM#(/\u001b8h\u0015\t\u0001T\u0006C\u00056\u0001\t\u0005\t\u0015!\u00037y\u0005a1\u000f^8sC\u001e,G*\u001a<fYB\u0011qGO\u0007\u0002q)\u0011\u0011(E\u0001\bgR|'/Y4f\u0013\tY\u0004H\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G.\u0003\u00026-!)a\b\u0001C\u0001\u007f\u00051A(\u001b8jiz\"B\u0001Q!C\u0007B\u0011\u0011\u0004\u0001\u0005\u0006Cu\u0002\rA\t\u0005\u0006Su\u0002\rA\u000b\u0005\u0006ku\u0002\rA\u000e\u0005\u0006\u000b\u0002!\tER\u0001\b_:\u001cF/\u0019:u)\u00059\u0005C\u0001\u0017I\u0013\tIUF\u0001\u0003V]&$\b\"B&\u0001\t\u00032\u0015AB8o'R|\u0007\u000fC\u0003N\u0001\u0011%a*A\u0007qCJ$\u0018\u000e^5p]NK'0\u001a\u000b\u0003\u001f~\u00032\u0001\u0015,Y\u001b\u0005\t&B\u0001\u0018S\u0015\t\u0019F+\u0001\u0003mC:<'\"A+\u0002\u0005ID\u0018BA,R\u0005)y%m]3sm\u0006\u0014G.\u001a\t\u00033vk\u0011A\u0017\u0006\u0003'nS\u0011\u0001X\u0001\u0005U\u00064\u0018-\u0003\u0002_5\n9\u0011J\u001c;fO\u0016\u0014\b\"\u00021M\u0001\u0004\t\u0017\u0001B2pe\u0016\u0004\"A\u00194\u000e\u0003\rT!\u0001\u00193\u000b\u0005\u00154\u0011AB2mS\u0016tG/\u0003\u0002hG\ni1\t\\;ti\u0016\u0014h)Y2bI\u0016DQ!\u001b\u0001\u0005\n)\faB]3rk\u0016\u001cHo\u0015;sK\u0006l7\u000fF\u0002liV\u00042\u0001\u0015,m!\ti'/D\u0001o\u0015\ty\u0007/A\u0002eGBT!!]2\u0002\u000f5,7o]1hK&\u00111O\u001c\u0002\u000b\t\u000e\u0003&+Z9vKN$\b\"\u00021i\u0001\u0004\t\u0007\"\u0002<i\u0001\u0004A\u0016!\u00048v[B\u000b'\u000f^5uS>t7\u000f")
public class CouchbaseReceiver
extends Receiver<StreamMessage>
implements Logging {
    private final CouchbaseConfig config;
    public final String com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName;
    private transient Logger org$apache$spark$Logging$$log_;

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public void onStart() {
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting Couchbase (DCP) Stream against Bucket ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName}));
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        AsyncBucket bucket = CouchbaseConnection$.MODULE$.apply().bucket(this.config, this.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName).async();
        ClusterFacade core = (ClusterFacade)bucket.core().toBlocking().single();
        JavaConversions$.MODULE$.toScalaObservable(core.send((CouchbaseRequest)new OpenConnectionRequest("sparkstream", this.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName))).flatMap((Function1)new Serializable(this, core){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;
            private final ClusterFacade core$1;

            public final Observable<Integer> apply(OpenConnectionResponse res) {
                ResponseStatus status = res.status();
                if (status.isSuccess()) {
                    this.$outer.logDebug((Function0<String>)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return "Stream Connection Request succeeded";
                        }
                    });
                } else {
                    this.$outer.logError((Function0<String>)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return "Stream Connection Request failed $status";
                        }
                    });
                }
                return this.$outer.com$couchbase$spark$streaming$CouchbaseReceiver$$partitionSize(this.core$1);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.core$1 = core$1;
            }
        }).flatMap((Function1)new Serializable(this, core){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;
            private final ClusterFacade core$1;

            public final Observable<DCPRequest> apply(Integer partitions) {
                this.$outer.logDebug((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Found $partitions partitions to open connections against.";
                    }
                });
                return this.$outer.com$couchbase$spark$streaming$CouchbaseReceiver$$requestStreams(this.core$1, partitions);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.core$1 = core$1;
            }
        }).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;

            public final StreamMessage apply(DCPRequest x0$1) {
                StreamMessage streamMessage;
                DCPRequest dCPRequest = x0$1;
                if (dCPRequest instanceof SnapshotMarkerMessage) {
                    SnapshotMarkerMessage snapshotMarkerMessage;
                    SnapshotMarkerMessage msg = snapshotMarkerMessage = (SnapshotMarkerMessage)dCPRequest;
                    streamMessage = new Snapshot(msg.startSequenceNumber(), msg.endSequenceNumber(), msg.memory(), msg.disk(), msg.checkpoint(), msg.ack());
                } else if (dCPRequest instanceof MutationMessage) {
                    MutationMessage mutationMessage;
                    MutationMessage msg = mutationMessage = (MutationMessage)dCPRequest;
                    byte[] data = new byte[msg.content().readableBytes()];
                    msg.content().readBytes(data);
                    Mutation mutation = new Mutation(msg.key(), data, Predef$.MODULE$.int2Integer(msg.expiration()), msg.cas(), msg.flags(), msg.lockTime());
                    streamMessage = mutation;
                } else if (dCPRequest instanceof RemoveMessage) {
                    RemoveMessage removeMessage;
                    RemoveMessage msg = removeMessage = (RemoveMessage)dCPRequest;
                    streamMessage = new Deletion(msg.key(), msg.cas());
                } else {
                    this.$outer.logError((Function0<String>)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return "Unknown DCP Stream Message $msg";
                        }
                    });
                    streamMessage = null;
                }
                return streamMessage;
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }).foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;

            public final void apply(StreamMessage dataItem) {
                this.$outer.store(dataItem);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }

    public void onStop() {
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Stopping Couchbase (DCP) Stream against Bucket ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName}));
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }

    public Observable<Integer> com$couchbase$spark$streaming$CouchbaseReceiver$$partitionSize(ClusterFacade core) {
        return JavaConversions$.MODULE$.toScalaObservable(core.send((CouchbaseRequest)new GetClusterConfigRequest())).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;

            public final Integer apply(GetClusterConfigResponse x$1) {
                return Predef$.MODULE$.int2Integer(((CouchbaseBucketConfig)x$1.config().bucketConfig(this.$outer.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName)).numberOfPartitions());
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }

    public Observable<DCPRequest> com$couchbase$spark$streaming$CouchbaseReceiver$$requestStreams(ClusterFacade core, Integer numPartitions) {
        return Observable$.MODULE$.from((Iterable)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), Predef$.MODULE$.Integer2int(numPartitions))).flatMap((Function1)new Serializable(this, core){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;
            private final ClusterFacade core$2;

            public final Observable<StreamRequestResponse> apply(int partition) {
                return JavaConversions$.MODULE$.toScalaObservable(this.core$2.send((CouchbaseRequest)new StreamRequestRequest((short)partition, this.$outer.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName)));
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.core$2 = core$2;
            }
        }).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Observable<DCPRequest> apply(StreamRequestResponse res) {
                return JavaConversions$.MODULE$.toScalaObservable(res.stream());
            }
        }).flatten(Predef$.MODULE$.conforms()).doOnNext((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ CouchbaseReceiver $outer;

            public final void apply(DCPRequest res) {
                this.$outer.logTrace((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Incoming Stream Message $res";
                    }
                });
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }

    public CouchbaseReceiver(CouchbaseConfig config, String bucketName, StorageLevel storageLevel) {
        this.config = config;
        this.com$couchbase$spark$streaming$CouchbaseReceiver$$bucketName = bucketName;
        super(storageLevel);
        Logging.class.$init$((Logging)this);
    }
}

