/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal.gridfs;

import com.mongodb.MongoGridFSException;
import com.mongodb.ReadPreference;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.ListIndexesPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
import com.mongodb.reactivestreams.client.internal.gridfs.ResizingByteBufferFlux;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Binary;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class GridFSUploadPublisherImpl
implements GridFSUploadPublisher<Void> {
    private static final Document PROJECTION = new Document("_id", (Object)1);
    private static final Document FILES_INDEX = new Document("filename", (Object)1).append("uploadDate", (Object)1);
    private static final Document CHUNKS_INDEX = new Document("files_id", (Object)1).append("n", (Object)1);
    private final ClientSession clientSession;
    private final MongoCollection<GridFSFile> filesCollection;
    private final MongoCollection<Document> chunksCollection;
    private final BsonValue fileId;
    private final String filename;
    private final int chunkSizeBytes;
    private final Document metadata;
    private final Publisher<ByteBuffer> source;

    public GridFSUploadPublisherImpl(@Nullable ClientSession clientSession, MongoCollection<GridFSFile> filesCollection, MongoCollection<Document> chunksCollection, BsonValue fileId, String filename, int chunkSizeBytes, @Nullable Document metadata, Publisher<ByteBuffer> source) {
        this.clientSession = clientSession;
        this.filesCollection = (MongoCollection)Assertions.notNull((String)"files collection", filesCollection);
        this.chunksCollection = (MongoCollection)Assertions.notNull((String)"chunks collection", chunksCollection);
        this.fileId = (BsonValue)Assertions.notNull((String)"File Id", (Object)fileId);
        this.filename = (String)Assertions.notNull((String)"filename", (Object)filename);
        this.chunkSizeBytes = chunkSizeBytes;
        this.metadata = metadata;
        this.source = source;
    }

    @Override
    public ObjectId getObjectId() {
        if (!this.fileId.isObjectId()) {
            throw new MongoGridFSException("Custom id type used for this GridFS upload stream");
        }
        return this.fileId.asObjectId().getValue();
    }

    @Override
    public BsonValue getId() {
        return this.fileId;
    }

    public void subscribe(Subscriber<? super Void> s) {
        Mono.deferContextual(ctx -> {
            AtomicBoolean terminated = new AtomicBoolean(false);
            return this.createCheckAndCreateIndexesMono().then(this.createSaveChunksMono(terminated)).flatMap(lengthInBytes -> this.createSaveFileDataMono(terminated, (long)lengthInBytes)).onErrorResume(originalError -> this.createCancellationMono(terminated).onErrorMap(cancellationError -> {
                originalError.addSuppressed((Throwable)cancellationError);
                return originalError;
            }).then(Mono.error((Throwable)originalError))).doOnCancel(() -> this.createCancellationMono(terminated).contextWrite(ctx).subscribe()).then();
        }).subscribe(s);
    }

    public GridFSUploadPublisher<ObjectId> withObjectId() {
        final GridFSUploadPublisherImpl wrapped = this;
        return new GridFSUploadPublisher<ObjectId>(){

            @Override
            public ObjectId getObjectId() {
                return wrapped.getObjectId();
            }

            @Override
            public BsonValue getId() {
                return wrapped.getId();
            }

            public void subscribe(Subscriber<? super ObjectId> subscriber) {
                Mono.from((Publisher)wrapped).thenReturn((Object)this.getObjectId()).subscribe(subscriber);
            }
        };
    }

    private Mono<Void> createCheckAndCreateIndexesMono() {
        MongoCollection<Document> collection = this.filesCollection.withDocumentClass(Document.class).withReadPreference(ReadPreference.primary());
        FindPublisher<Document> findPublisher = this.clientSession != null ? collection.find(this.clientSession) : collection.find();
        return Mono.from(findPublisher.projection((Bson)PROJECTION).first()).switchIfEmpty(Mono.defer(() -> this.checkAndCreateIndex(this.filesCollection.withReadPreference(ReadPreference.primary()), FILES_INDEX).then(this.checkAndCreateIndex(this.chunksCollection.withReadPreference(ReadPreference.primary()), CHUNKS_INDEX)).then(Mono.fromCallable(Document::new)))).then();
    }

    private <T> Mono<Boolean> hasIndex(MongoCollection<T> collection, Document index) {
        ListIndexesPublisher<Document> listIndexesPublisher = this.clientSession != null ? collection.listIndexes(this.clientSession) : collection.listIndexes();
        return Flux.from(listIndexesPublisher).filter(result -> {
            Document indexDoc = (Document)result.get((Object)"key", (Object)new Document());
            for (Map.Entry entry : indexDoc.entrySet()) {
                if (!(entry.getValue() instanceof Number)) continue;
                entry.setValue(((Number)entry.getValue()).intValue());
            }
            return indexDoc.equals((Object)index);
        }).take(1L).hasElements();
    }

    private <T> Mono<Void> checkAndCreateIndex(MongoCollection<T> collection, Document index) {
        return this.hasIndex(collection, index).flatMap(hasIndex -> {
            if (!hasIndex.booleanValue()) {
                return this.createIndexMono(collection, index).then();
            }
            return Mono.empty();
        });
    }

    private <T> Mono<String> createIndexMono(MongoCollection<T> collection, Document index) {
        return Mono.from(this.clientSession == null ? collection.createIndex((Bson)index) : collection.createIndex(this.clientSession, (Bson)index));
    }

    private Mono<Long> createSaveChunksMono(AtomicBoolean terminated) {
        return new ResizingByteBufferFlux(this.source, this.chunkSizeBytes).index().flatMap(indexAndBuffer -> {
            if (terminated.get()) {
                return Mono.empty();
            }
            Long index = (Long)indexAndBuffer.getT1();
            ByteBuffer byteBuffer = (ByteBuffer)indexAndBuffer.getT2();
            byte[] byteArray = new byte[byteBuffer.remaining()];
            if (byteBuffer.hasArray()) {
                System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
            } else {
                byteBuffer.mark();
                byteBuffer.get(byteArray);
                byteBuffer.reset();
            }
            Binary data = new Binary(byteArray);
            Document chunkDocument = new Document("files_id", (Object)this.fileId).append("n", (Object)index.intValue()).append("data", (Object)data);
            Publisher<InsertOneResult> insertOnePublisher = this.clientSession == null ? this.chunksCollection.insertOne(chunkDocument) : this.chunksCollection.insertOne(this.clientSession, chunkDocument);
            return Mono.from(insertOnePublisher).thenReturn((Object)data.length());
        }).reduce((Object)0L, Long::sum);
    }

    private Mono<InsertOneResult> createSaveFileDataMono(AtomicBoolean terminated, long lengthInBytes) {
        if (terminated.compareAndSet(false, true)) {
            GridFSFile gridFSFile = new GridFSFile(this.fileId, this.filename, lengthInBytes, this.chunkSizeBytes, new Date(), this.metadata);
            if (this.clientSession != null) {
                return Mono.from(this.filesCollection.insertOne(this.clientSession, gridFSFile));
            }
            return Mono.from(this.filesCollection.insertOne(gridFSFile));
        }
        return Mono.empty();
    }

    private Mono<DeleteResult> createCancellationMono(AtomicBoolean terminated) {
        if (terminated.compareAndSet(false, true)) {
            if (this.clientSession != null) {
                return Mono.from(this.chunksCollection.deleteMany(this.clientSession, (Bson)new Document("files_id", (Object)this.fileId)));
            }
            return Mono.from(this.chunksCollection.deleteMany((Bson)new Document("files_id", (Object)this.fileId)));
        }
        return Mono.empty();
    }
}

