/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.mongo.eventsourcing.eventstore;

import com.mongodb.BasicDBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Sorts;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Assert;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.mongo.eventsourcing.eventstore.MongoTrackingToken;
import org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy;
import org.axonframework.mongo.eventsourcing.eventstore.documentperevent.EventEntryConfiguration;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;

public abstract class AbstractMongoEventStorageStrategy
implements StorageStrategy {
    protected static final int ORDER_ASC = 1;
    protected static final int ORDER_DESC = -1;
    private final EventEntryConfiguration eventConfiguration;
    private final Duration lookBackTime;

    public AbstractMongoEventStorageStrategy(EventEntryConfiguration eventConfiguration, Duration lookBackTime) {
        this.eventConfiguration = (EventEntryConfiguration)ObjectUtils.getOrDefault((Object)eventConfiguration, (Object)EventEntryConfiguration.getDefault());
        this.lookBackTime = (Duration)ObjectUtils.getOrDefault((Object)lookBackTime, (Object)Duration.ofMillis(1000L));
    }

    @Override
    public void appendEvents(MongoCollection<Document> eventCollection, List<? extends EventMessage<?>> events, Serializer serializer) {
        eventCollection.insertMany(this.createEventDocuments(events, serializer).collect(Collectors.toList()));
    }

    protected abstract Stream<Document> createEventDocuments(List<? extends EventMessage<?>> var1, Serializer var2);

    @Override
    public void appendSnapshot(MongoCollection<Document> snapshotCollection, DomainEventMessage<?> snapshot, Serializer serializer) {
        snapshotCollection.findOneAndReplace((Bson)new BsonDocument(this.eventConfiguration.aggregateIdentifierProperty(), (BsonValue)new BsonString(snapshot.getAggregateIdentifier())), (Object)this.createSnapshotDocument(snapshot, serializer), new FindOneAndReplaceOptions().upsert(true));
    }

    protected abstract Document createSnapshotDocument(DomainEventMessage<?> var1, Serializer var2);

    @Override
    public void deleteSnapshots(MongoCollection<Document> snapshotCollection, String aggregateIdentifier, long sequenceNumber) {
        snapshotCollection.deleteMany(Filters.and((Bson[])new Bson[]{Filters.eq((String)this.eventConfiguration.aggregateIdentifierProperty(), (Object)aggregateIdentifier), Filters.lt((String)this.eventConfiguration.sequenceNumberProperty(), (Object)sequenceNumber)}));
    }

    @Override
    public List<? extends DomainEventData<?>> findDomainEvents(MongoCollection<Document> collection, String aggregateIdentifier, long firstSequenceNumber, int batchSize) {
        FindIterable cursor = collection.find(Filters.and((Bson[])new Bson[]{Filters.eq((String)this.eventConfiguration.aggregateIdentifierProperty(), (Object)aggregateIdentifier), Filters.gte((String)this.eventConfiguration.sequenceNumberProperty(), (Object)firstSequenceNumber)})).sort((Bson)new BasicDBObject(this.eventConfiguration().sequenceNumberProperty(), (Object)1));
        cursor = cursor.batchSize(batchSize);
        return StreamSupport.stream(cursor.spliterator(), false).flatMap(this::extractEvents).filter(event -> event.getSequenceNumber() >= firstSequenceNumber).collect(Collectors.toList());
    }

    protected abstract Stream<? extends DomainEventData<?>> extractEvents(Document var1);

    @Override
    public List<? extends TrackedEventData<?>> findTrackedEvents(MongoCollection<Document> eventCollection, TrackingToken lastToken, int batchSize) {
        FindIterable cursor;
        if (lastToken == null) {
            cursor = eventCollection.find();
        } else {
            Assert.isTrue((boolean)(lastToken instanceof MongoTrackingToken), () -> String.format("Token %s is of the wrong type", lastToken));
            MongoTrackingToken trackingToken = (MongoTrackingToken)lastToken;
            cursor = eventCollection.find(Filters.and((Bson[])new Bson[]{Filters.gte((String)this.eventConfiguration.timestampProperty(), (Object)DateTimeUtils.formatInstant((TemporalAccessor)trackingToken.getTimestamp().minus(this.lookBackTime))), Filters.nin((String)this.eventConfiguration.eventIdentifierProperty(), trackingToken.getKnownEventIds())}));
        }
        cursor = cursor.sort((Bson)new BasicDBObject(this.eventConfiguration().timestampProperty(), (Object)1).append(this.eventConfiguration().sequenceNumberProperty(), (Object)1));
        cursor = cursor.batchSize(batchSize);
        AtomicReference<MongoTrackingToken> previousToken = new AtomicReference<MongoTrackingToken>((MongoTrackingToken)lastToken);
        ArrayList results = new ArrayList();
        MongoCursor iterator = cursor.iterator();
        while (results.size() < batchSize && iterator.hasNext()) {
            Document document = (Document)iterator.next();
            this.extractEvents(document).filter(ed -> previousToken.get() == null || !((MongoTrackingToken)previousToken.get()).getKnownEventIds().contains(ed.getEventIdentifier())).map(event -> new TrackedMongoEventEntry(event, (TrackingToken)previousToken.updateAndGet(token -> token == null ? MongoTrackingToken.of(event.getTimestamp(), event.getEventIdentifier()) : token.advanceTo(event.getTimestamp(), event.getEventIdentifier(), this.lookBackTime)))).forEach(results::add);
        }
        return results;
    }

    @Override
    public Stream<? extends DomainEventData<?>> findSnapshots(MongoCollection<Document> snapshotCollection, String aggregateIdentifier) {
        FindIterable cursor = snapshotCollection.find(Filters.eq((String)this.eventConfiguration.aggregateIdentifierProperty(), (Object)aggregateIdentifier)).sort(Sorts.orderBy((Bson[])new Bson[]{Sorts.descending((String[])new String[]{this.eventConfiguration.sequenceNumberProperty()})}));
        return StreamSupport.stream(cursor.spliterator(), false).map(this::extractSnapshot);
    }

    @Override
    public Optional<Long> lastSequenceNumberFor(MongoCollection<Document> eventsCollection, String aggregateIdentifier) {
        Document lastDocument = (Document)eventsCollection.find(Filters.eq((String)this.eventConfiguration.aggregateIdentifierProperty(), (Object)aggregateIdentifier)).sort(Sorts.descending((String[])new String[]{this.eventConfiguration.sequenceNumberProperty()})).first();
        return Optional.ofNullable(lastDocument).map(this::extractHighestSequenceNumber);
    }

    @Override
    public TrackingToken createTailToken(MongoCollection<Document> eventsCollection) {
        Document first = (Document)eventsCollection.find().sort(Sorts.ascending((String[])new String[]{this.eventConfiguration.timestampProperty()})).first();
        return Optional.ofNullable(first).map(d -> d.get((Object)this.eventConfiguration.timestampProperty())).map(t -> DateTimeUtils.parseInstant((CharSequence)((String)t))).map(t -> MongoTrackingToken.of(t, Collections.emptyMap())).orElse(null);
    }

    protected abstract DomainEventData<?> extractSnapshot(Document var1);

    protected Long extractHighestSequenceNumber(Document document) {
        return (Long)document.get((Object)this.eventConfiguration.sequenceNumberProperty());
    }

    @Override
    public void ensureIndexes(MongoCollection<Document> eventsCollection, MongoCollection<Document> snapshotsCollection) {
        eventsCollection.createIndex((Bson)new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), (Object)1).append(this.eventConfiguration.sequenceNumberProperty(), (Object)1), new IndexOptions().unique(true).name("uniqueAggregateIndex"));
        eventsCollection.createIndex((Bson)new BasicDBObject(this.eventConfiguration.timestampProperty(), (Object)1).append(this.eventConfiguration.sequenceNumberProperty(), (Object)1), new IndexOptions().unique(false).name("orderedEventStreamIndex"));
        snapshotsCollection.createIndex((Bson)new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), (Object)1).append(this.eventConfiguration.sequenceNumberProperty(), (Object)1), new IndexOptions().unique(true).name("uniqueAggregateIndex"));
    }

    protected EventEntryConfiguration eventConfiguration() {
        return this.eventConfiguration;
    }

    private static class TrackedMongoEventEntry<T>
    implements DomainEventData<T>,
    TrackedEventData<T> {
        private final DomainEventData<T> delegate;
        private final TrackingToken trackingToken;

        public TrackedMongoEventEntry(DomainEventData<T> delegate, TrackingToken trackingToken) {
            this.delegate = delegate;
            this.trackingToken = trackingToken;
        }

        public String getType() {
            return this.delegate.getType();
        }

        public String getAggregateIdentifier() {
            return this.delegate.getAggregateIdentifier();
        }

        public long getSequenceNumber() {
            return this.delegate.getSequenceNumber();
        }

        public TrackingToken trackingToken() {
            return this.trackingToken;
        }

        public String getEventIdentifier() {
            return this.delegate.getEventIdentifier();
        }

        public Instant getTimestamp() {
            return this.delegate.getTimestamp();
        }

        public SerializedObject<T> getMetaData() {
            return this.delegate.getMetaData();
        }

        public SerializedObject<T> getPayload() {
            return this.delegate.getPayload();
        }
    }
}

