/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal.gridfs;

import com.mongodb.MongoGridFSException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.ReadPreference;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
import com.mongodb.reactivestreams.client.internal.TimeoutHelper;
import com.mongodb.reactivestreams.client.internal.gridfs.ResizingByteBufferFlux;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Binary;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class GridFSUploadPublisherImpl
implements GridFSUploadPublisher<Void> {
    private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit.";
    private static final Document PROJECTION = new Document("_id", (Object)1);
    private static final Document FILES_INDEX = new Document("filename", (Object)1).append("uploadDate", (Object)1);
    private static final Document CHUNKS_INDEX = new Document("files_id", (Object)1).append("n", (Object)1);
    private final ClientSession clientSession;
    private final MongoCollection<GridFSFile> filesCollection;
    private final MongoCollection<Document> chunksCollection;
    private final BsonValue fileId;
    private final String filename;
    private final int chunkSizeBytes;
    private final Document metadata;
    private final Publisher<ByteBuffer> source;
    @Nullable
    private final Long timeoutMs;

    public GridFSUploadPublisherImpl(@Nullable ClientSession clientSession, MongoCollection<GridFSFile> filesCollection, MongoCollection<Document> chunksCollection, BsonValue fileId, String filename, int chunkSizeBytes, @Nullable Document metadata, Publisher<ByteBuffer> source) {
        this.clientSession = clientSession;
        this.filesCollection = (MongoCollection)Assertions.notNull((String)"files collection", filesCollection);
        this.chunksCollection = (MongoCollection)Assertions.notNull((String)"chunks collection", chunksCollection);
        this.fileId = (BsonValue)Assertions.notNull((String)"File Id", (Object)fileId);
        this.filename = (String)Assertions.notNull((String)"filename", (Object)filename);
        this.chunkSizeBytes = chunkSizeBytes;
        this.metadata = metadata;
        this.source = source;
        this.timeoutMs = filesCollection.getTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public ObjectId getObjectId() {
        if (!this.fileId.isObjectId()) {
            throw new MongoGridFSException("Custom id type used for this GridFS upload stream");
        }
        return this.fileId.asObjectId().getValue();
    }

    @Override
    public BsonValue getId() {
        return this.fileId;
    }

    public void subscribe(Subscriber<? super Void> s) {
        Mono.deferContextual(ctx -> {
            AtomicBoolean terminated = new AtomicBoolean(false);
            Timeout timeout = TimeoutContext.startTimeout((Long)this.timeoutMs);
            return this.createCheckAndCreateIndexesMono(timeout).then(this.createSaveChunksMono(terminated, timeout)).flatMap(lengthInBytes -> this.createSaveFileDataMono(terminated, (long)lengthInBytes, timeout)).onErrorResume(originalError -> this.createCancellationMono(terminated, timeout).onErrorMap(cancellationError -> {
                originalError.addSuppressed((Throwable)cancellationError);
                return originalError;
            }).then(Mono.error((Throwable)originalError))).doOnCancel(() -> this.createCancellationMono(terminated, timeout).contextWrite(ctx).subscribe()).then();
        }).subscribe(s);
    }

    public GridFSUploadPublisher<ObjectId> withObjectId() {
        final GridFSUploadPublisherImpl wrapped = this;
        return new GridFSUploadPublisher<ObjectId>(){

            @Override
            public ObjectId getObjectId() {
                return wrapped.getObjectId();
            }

            @Override
            public BsonValue getId() {
                return wrapped.getId();
            }

            public void subscribe(Subscriber<? super ObjectId> subscriber) {
                Mono.from((Publisher)wrapped).thenReturn((Object)this.getObjectId()).subscribe(subscriber);
            }
        };
    }

    private Mono<Void> createCheckAndCreateIndexesMono(@Nullable Timeout timeout) {
        return TimeoutHelper.collectionWithTimeoutDeferred(this.filesCollection.withDocumentClass(Document.class).withReadPreference(ReadPreference.primary()), timeout).map(collection -> this.clientSession != null ? collection.find(this.clientSession) : collection.find()).flatMap(findPublisher -> Mono.from(findPublisher.projection((Bson)PROJECTION).first())).switchIfEmpty(Mono.defer(() -> this.checkAndCreateIndex(this.filesCollection.withReadPreference(ReadPreference.primary()), FILES_INDEX, timeout).then(this.checkAndCreateIndex(this.chunksCollection.withReadPreference(ReadPreference.primary()), CHUNKS_INDEX, timeout)).then(Mono.empty()))).then();
    }

    private <T> Mono<Boolean> hasIndex(MongoCollection<T> collection, Document index, @Nullable Timeout timeout) {
        return TimeoutHelper.collectionWithTimeoutDeferred(collection, timeout).map(wrappedCollection -> {
            if (this.clientSession != null) {
                return wrappedCollection.listIndexes(this.clientSession);
            }
            return wrappedCollection.listIndexes();
        }).flatMapMany(Flux::from).collectList().map(indexes -> {
            boolean hasIndex = false;
            for (Document result : indexes) {
                Document indexDoc = (Document)result.get((Object)"key", (Object)new Document());
                for (Map.Entry entry : indexDoc.entrySet()) {
                    if (!(entry.getValue() instanceof Number)) continue;
                    entry.setValue(((Number)entry.getValue()).intValue());
                }
                if (!indexDoc.equals((Object)index)) continue;
                hasIndex = true;
                break;
            }
            return hasIndex;
        });
    }

    private <T> Mono<Void> checkAndCreateIndex(MongoCollection<T> collection, Document index, @Nullable Timeout timeout) {
        return this.hasIndex(collection, index, timeout).flatMap(hasIndex -> {
            if (!hasIndex.booleanValue()) {
                return this.createIndexMono(collection, index, timeout).flatMap(s -> Mono.empty());
            }
            return Mono.empty();
        });
    }

    private <T> Mono<String> createIndexMono(MongoCollection<T> collection, Document index, @Nullable Timeout timeout) {
        return TimeoutHelper.collectionWithTimeoutDeferred(collection, timeout).flatMap(wrappedCollection -> Mono.from(this.clientSession == null ? wrappedCollection.createIndex((Bson)index) : wrappedCollection.createIndex(this.clientSession, (Bson)index)));
    }

    private Mono<Long> createSaveChunksMono(AtomicBoolean terminated, @Nullable Timeout timeout) {
        return new ResizingByteBufferFlux(this.source, this.chunkSizeBytes).takeUntilOther((Publisher)GridFSUploadPublisherImpl.createMonoTimer(timeout)).index().flatMap(indexAndBuffer -> {
            if (terminated.get()) {
                return Mono.empty();
            }
            Long index = (Long)indexAndBuffer.getT1();
            ByteBuffer byteBuffer = (ByteBuffer)indexAndBuffer.getT2();
            byte[] byteArray = new byte[byteBuffer.remaining()];
            if (byteBuffer.hasArray()) {
                System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
            } else {
                byteBuffer.mark();
                byteBuffer.get(byteArray);
                byteBuffer.reset();
            }
            Binary data = new Binary(byteArray);
            Document chunkDocument = new Document("files_id", (Object)this.fileId).append("n", (Object)index.intValue()).append("data", (Object)data);
            Publisher<InsertOneResult> insertOnePublisher = this.clientSession == null ? TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument) : TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(this.clientSession, chunkDocument);
            return Mono.from(insertOnePublisher).thenReturn((Object)data.length());
        }).reduce((Object)0L, Long::sum);
    }

    private static Mono<MongoOperationTimeoutException> createMonoTimer(@Nullable Timeout timeout) {
        return (Mono)Timeout.nullAsInfinite((Timeout)timeout).call(TimeUnit.MILLISECONDS, () -> Mono.never(), ms -> Mono.delay((Duration)Duration.ofMillis(ms)).then(GridFSUploadPublisherImpl.createTimeoutMonoError()), () -> GridFSUploadPublisherImpl.createTimeoutMonoError());
    }

    private static Mono<MongoOperationTimeoutException> createTimeoutMonoError() {
        return Mono.error((Throwable)TimeoutContext.createMongoTimeoutException((String)"GridFS waiting for data from the source Publisher exceeded the timeout limit."));
    }

    private Mono<InsertOneResult> createSaveFileDataMono(AtomicBoolean terminated, long lengthInBytes, @Nullable Timeout timeout) {
        Mono<MongoCollection<GridFSFile>> filesCollectionMono = TimeoutHelper.collectionWithTimeoutDeferred(this.filesCollection, timeout);
        if (terminated.compareAndSet(false, true)) {
            GridFSFile gridFSFile = new GridFSFile(this.fileId, this.filename, lengthInBytes, this.chunkSizeBytes, new Date(), this.metadata);
            if (this.clientSession != null) {
                return filesCollectionMono.flatMap(collection -> Mono.from(collection.insertOne(this.clientSession, gridFSFile)));
            }
            return filesCollectionMono.flatMap(collection -> Mono.from(collection.insertOne(gridFSFile)));
        }
        return Mono.empty();
    }

    private Mono<DeleteResult> createCancellationMono(AtomicBoolean terminated, @Nullable Timeout timeout) {
        Mono<MongoCollection<Document>> chunksCollectionMono = TimeoutHelper.collectionWithTimeoutDeferred(this.chunksCollection, timeout);
        if (terminated.compareAndSet(false, true)) {
            if (this.clientSession != null) {
                return chunksCollectionMono.flatMap(collection -> Mono.from(collection.deleteMany(this.clientSession, (Bson)new Document("files_id", (Object)this.fileId))));
            }
            return chunksCollectionMono.flatMap(collection -> Mono.from(collection.deleteMany((Bson)new Document("files_id", (Object)this.fileId))));
        }
        return Mono.empty();
    }
}

