package io.debezium.connector.mongodb;

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.class */
public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource {
    private static final Logger LOGGER;
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private static final String OPERATION_FIELD = "op";
    private static final String OBJECT_FIELD = "o";
    private static final String OPERATION_CONTROL = "c";
    private static final String TX_OPS = "applyOps";
    private final EventDispatcher<CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final MongoDbOffsetContext offsetContext;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource$ReplicaSetOplogContext.class */
    public class ReplicaSetOplogContext {
        private final ReplicaSetOffsetContext offset;
        private final ConnectionContext.MongoPrimary primary;
        private final ReplicaSet replicaSet;
        private BsonTimestamp incompleteEventTimestamp;
        private long incompleteTxOrder = 0;

        ReplicaSetOplogContext(ReplicaSetOffsetContext replicaSetOffsetContext, ConnectionContext.MongoPrimary mongoPrimary, ReplicaSet replicaSet) {
            this.offset = replicaSetOffsetContext;
            this.primary = mongoPrimary;
            this.replicaSet = replicaSet;
        }

        ReplicaSetOffsetContext getOffset() {
            return this.offset;
        }

        ConnectionContext.MongoPrimary getPrimary() {
            return this.primary;
        }

        String getReplicaSetName() {
            return this.replicaSet.replicaSetName();
        }

        BsonTimestamp getIncompleteEventTimestamp() {
            return this.incompleteEventTimestamp;
        }

        public void setIncompleteEventTimestamp(BsonTimestamp bsonTimestamp) {
            this.incompleteEventTimestamp = bsonTimestamp;
        }

        public long getIncompleteTxOrder() {
            return this.incompleteTxOrder;
        }

        public void setIncompleteTxOrder(long j) {
            this.incompleteTxOrder = j;
        }
    }

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig mongoDbConnectorConfig, MongoDbTaskContext mongoDbTaskContext, ReplicaSets replicaSets, MongoDbOffsetContext mongoDbOffsetContext, EventDispatcher<CollectionId> eventDispatcher, ErrorHandler errorHandler, Clock clock) {
        this.connectionContext = mongoDbTaskContext.getConnectionContext();
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = mongoDbTaskContext;
        this.offsetContext = mongoDbOffsetContext != null ? mongoDbOffsetContext : initializeOffsets(mongoDbConnectorConfig, replicaSets);
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        int replicaSetCount = this.replicaSets.replicaSetCount();
        ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "replicator-streaming", replicaSetCount);
        CountDownLatch countDownLatch = new CountDownLatch(replicaSetCount);
        LOGGER.info("Starting {} thread(s) to stream changes for replica sets: {}", Integer.valueOf(replicaSetCount), this.replicaSets);
        this.replicaSets.validReplicaSets().forEach(replicaSet -> {
            newFixedThreadPool.submit(() -> {
                ConnectionContext.MongoPrimary mongoPrimary = null;
                try {
                    try {
                        mongoPrimary = establishConnectionToPrimary(replicaSet);
                        if (mongoPrimary != null) {
                            AtomicReference atomicReference = new AtomicReference(mongoPrimary);
                            mongoPrimary.execute("read from oplog on '" + replicaSet + "'", mongoClient -> {
                                readOplog(mongoClient, (ConnectionContext.MongoPrimary) atomicReference.get(), replicaSet, changeEventSourceContext);
                            });
                        }
                        if (mongoPrimary != null) {
                            mongoPrimary.stop();
                        }
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        LOGGER.error("Streaming for replica set {} failed", replicaSet.replicaSetName(), th);
                        this.errorHandler.setProducerThrowable(th);
                        if (mongoPrimary != null) {
                            mongoPrimary.stop();
                        }
                        countDownLatch.countDown();
                    }
                } catch (Throwable th2) {
                    if (mongoPrimary != null) {
                        mongoPrimary.stop();
                    }
                    countDownLatch.countDown();
                    throw th2;
                }
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        try {
            newFixedThreadPool.shutdown();
            this.taskContext.getConnectionContext().shutdown();
        } catch (Throwable th) {
            this.taskContext.getConnectionContext().shutdown();
            throw th;
        }
    }

    private ConnectionContext.MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) {
        return this.connectionContext.primaryFor(replicaSet, this.taskContext.filters(), (str, th) -> {
            if (th.getMessage() != null && th.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + str, th);
            }
            LOGGER.error("Error while attempting to {}: {}", new Object[]{str, th.getMessage(), th});
            throw new ConnectException("Error while attempting to " + str, th);
        });
    }

    private void readOplog(MongoClient mongoClient, ConnectionContext.MongoPrimary mongoPrimary, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) {
        Bson and;
        ReplicaSetOffsetContext replicaSetOffsetContext = this.offsetContext.getReplicaSetOffsetContext(replicaSet);
        BsonTimestamp lastOffsetTimestamp = replicaSetOffsetContext.lastOffsetTimestamp();
        OptionalLong lastOffsetTxOrder = replicaSetOffsetContext.lastOffsetTxOrder();
        ServerAddress address = mongoClient.getAddress();
        LOGGER.info("Reading oplog for '{}' primary {} starting at {}", new Object[]{replicaSet, address, lastOffsetTimestamp});
        MongoCollection collection = mongoClient.getDatabase("local").getCollection("oplog.rs");
        ReplicaSetOplogContext replicaSetOplogContext = new ReplicaSetOplogContext(replicaSetOffsetContext, mongoPrimary, replicaSet);
        if (lastOffsetTxOrder.isPresent()) {
            LOGGER.info("The last event processed was transactional, resuming at the oplog event '{}', expecting to skip '{}' events", lastOffsetTimestamp, Long.valueOf(lastOffsetTxOrder.getAsLong()));
            and = com.mongodb.client.model.Filters.and(new Bson[]{com.mongodb.client.model.Filters.gte("ts", lastOffsetTimestamp), com.mongodb.client.model.Filters.exists("fromMigrate", false)});
            replicaSetOplogContext.setIncompleteEventTimestamp(lastOffsetTimestamp);
            replicaSetOplogContext.setIncompleteTxOrder(lastOffsetTxOrder.getAsLong());
        } else {
            LOGGER.info("The last event processed was not transactional, resuming at the oplog event after '{}'", lastOffsetTimestamp);
            and = com.mongodb.client.model.Filters.and(new Bson[]{com.mongodb.client.model.Filters.gt("ts", lastOffsetTimestamp), com.mongodb.client.model.Filters.exists("fromMigrate", false)});
        }
        Bson skippedOperationsFilter = getSkippedOperationsFilter();
        if (skippedOperationsFilter != null) {
            and = com.mongodb.client.model.Filters.and(new Bson[]{and, skippedOperationsFilter});
        }
        MongoCursor it = collection.find(and).sort(new Document("$natural", 1)).oplogReplay(true).cursorType(CursorType.TailableAwait).iterator();
        Throwable th = null;
        try {
            try {
                Metronome sleeper = Metronome.sleeper(Duration.ofMillis(500L), this.clock);
                while (changeEventSourceContext.isRunning()) {
                    Document document = (Document) it.tryNext();
                    if (document == null) {
                        try {
                            sleeper.pause();
                        } catch (InterruptedException e) {
                        }
                    } else {
                        if (!handleOplogEvent(address, document, document, 0L, replicaSetOplogContext)) {
                            if (it != null) {
                                if (0 == 0) {
                                    it.close();
                                    return;
                                }
                                try {
                                    it.close();
                                    return;
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                    return;
                                }
                            }
                            return;
                        }
                        try {
                            this.dispatcher.dispatchHeartbeatEvent(replicaSetOplogContext.getOffset());
                        } catch (InterruptedException e2) {
                            LOGGER.info("Replicator thread is interrupted");
                            Thread.currentThread().interrupt();
                            if (it != null) {
                                if (0 == 0) {
                                    it.close();
                                    return;
                                }
                                try {
                                    it.close();
                                    return;
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                    return;
                                }
                            }
                            return;
                        }
                    }
                }
                if (it != null) {
                    if (0 == 0) {
                        it.close();
                        return;
                    }
                    try {
                        it.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                th = th5;
                throw th5;
            }
        } catch (Throwable th6) {
            if (it != null) {
                if (th != null) {
                    try {
                        it.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    it.close();
                }
            }
            throw th6;
        }
    }

    private Bson getSkippedOperationsFilter() {
        Set skippedOps = this.taskContext.getConnectorConfig().getSkippedOps();
        if (skippedOps.isEmpty()) {
            return null;
        }
        Bson bson = null;
        Iterator it = skippedOps.iterator();
        while (it.hasNext()) {
            Bson ne = com.mongodb.client.model.Filters.ne(OPERATION_FIELD, ((Envelope.Operation) it.next()).code());
            bson = bson == null ? ne : com.mongodb.client.model.Filters.or(new Bson[]{bson, ne});
        }
        return bson;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean handleOplogEvent(ServerAddress serverAddress, Document document, Document document2, long j, ReplicaSetOplogContext replicaSetOplogContext) {
        String string = document.getString(SourceInfo.NAMESPACE);
        Document document3 = (Document) document.get(OBJECT_FIELD, Document.class);
        if (Objects.isNull(document3)) {
            if (!LOGGER.isWarnEnabled()) {
                return true;
            }
            LOGGER.warn("Missing 'o' field in event, so skipping {}", document.toJson());
            return true;
        }
        if (Objects.isNull(string) || string.isEmpty()) {
            if ("new primary".equals(document3.getString("msg"))) {
                AtomicReference atomicReference = new AtomicReference();
                try {
                    replicaSetOplogContext.getPrimary().executeBlocking("conn", mongoClient -> {
                        atomicReference.set(mongoClient.getAddress());
                    });
                } catch (InterruptedException e) {
                    LOGGER.error("Get current primary executeBlocking", e);
                }
                ServerAddress serverAddress2 = (ServerAddress) atomicReference.get();
                if (!Objects.nonNull(serverAddress2) || serverAddress2.equals(serverAddress)) {
                    LOGGER.info("Found new primary event in oplog, current {} is new primary. Continue to process oplog event.", serverAddress);
                } else {
                    LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", serverAddress, serverAddress2);
                }
            }
            if (!LOGGER.isDebugEnabled()) {
                return true;
            }
            LOGGER.debug("Skipping event with no namespace: {}", document.toJson());
            return true;
        }
        List<Document> transactionChanges = transactionChanges(document);
        if (!transactionChanges.isEmpty()) {
            if (Objects.nonNull(replicaSetOplogContext.getIncompleteEventTimestamp())) {
                if (replicaSetOplogContext.getIncompleteEventTimestamp().equals(SourceInfo.extractEventTimestamp(document))) {
                    for (Document document4 : transactionChanges) {
                        j++;
                        if (j <= replicaSetOplogContext.getIncompleteTxOrder()) {
                            LOGGER.debug("Skipping record as it is expected to be already processed: {}", document4);
                        } else if (!handleOplogEvent(serverAddress, document4, document, j, replicaSetOplogContext)) {
                            return false;
                        }
                    }
                }
                replicaSetOplogContext.setIncompleteEventTimestamp(null);
                return true;
            }
            try {
                this.dispatcher.dispatchTransactionStartedEvent(Long.toString(document.getLong(SourceInfo.OPERATION_ID).longValue()), replicaSetOplogContext.getOffset());
                Iterator<Document> it = transactionChanges.iterator();
                while (it.hasNext()) {
                    long j2 = j + 1;
                    j = document;
                    if (!handleOplogEvent(serverAddress, it.next(), document, j2, replicaSetOplogContext)) {
                        return false;
                    }
                }
                this.dispatcher.dispatchTransactionCommittedEvent(replicaSetOplogContext.getOffset());
                return true;
            } catch (InterruptedException e2) {
                LOGGER.error("Streaming transaction changes for replica set '{}' was interrupted", replicaSetOplogContext.getReplicaSetName());
                throw new ConnectException("Streaming of transaction changes was interrupted for replica set " + replicaSetOplogContext.getReplicaSetName(), e2);
            }
        }
        String string2 = document.getString(OPERATION_FIELD);
        if (!MongoDbChangeRecordEmitter.isValidOperation(string2)) {
            LOGGER.debug("Skipping event with \"op={}\"", string2);
            return true;
        }
        int indexOf = string.indexOf(46);
        if (indexOf <= 0) {
            return true;
        }
        if (!$assertionsDisabled && indexOf + 1 >= string.length()) {
            throw new AssertionError();
        }
        String substring = string.substring(0, indexOf);
        String substring2 = string.substring(indexOf + 1);
        if ("$cmd".equals(substring2)) {
            LOGGER.debug("Skipping database command event: {}", document.toJson());
            return true;
        }
        if (!this.taskContext.filters().databaseFilter().test(substring)) {
            LOGGER.debug("Skipping the event for database {} based on database.whitelist", substring);
            return true;
        }
        replicaSetOplogContext.getOffset().oplogEvent(document, document2, Long.valueOf(j));
        replicaSetOplogContext.getOffset().getOffset();
        CollectionId collectionId = new CollectionId(replicaSetOplogContext.getReplicaSetName(), substring, substring2);
        if (!this.taskContext.filters().collectionFilter().test(collectionId)) {
            return true;
        }
        try {
            return this.dispatcher.dispatchDataChangeEvent(collectionId, new MongoDbChangeRecordEmitter(replicaSetOplogContext.getOffset(), this.clock, document));
        } catch (Exception e3) {
            this.errorHandler.setProducerThrowable(e3);
            return false;
        }
    }

    private List<Document> transactionChanges(Document document) {
        String string = document.getString(OPERATION_FIELD);
        Document document2 = (Document) document.get(OBJECT_FIELD, Document.class);
        return (OPERATION_CONTROL.equals(string) && Objects.nonNull(document2) && document2.containsKey(TX_OPS)) ? (List) document2.get(TX_OPS, List.class) : Collections.emptyList();
    }

    protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig mongoDbConnectorConfig, ReplicaSets replicaSets) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        replicaSets.onEachReplicaSet(replicaSet -> {
            LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
            ConnectionContext.MongoPrimary establishConnectionToPrimary = establishConnectionToPrimary(replicaSet);
            if (establishConnectionToPrimary != null) {
                try {
                    establishConnectionToPrimary.execute("get oplog position", mongoClient -> {
                        linkedHashMap.put(replicaSet, (Document) mongoClient.getDatabase("local").getCollection("oplog.rs").find().sort(new Document("$natural", -1)).limit(1).first());
                    });
                    LOGGER.info("Stopping primary client");
                    establishConnectionToPrimary.stop();
                } catch (Throwable th) {
                    LOGGER.info("Stopping primary client");
                    establishConnectionToPrimary.stop();
                    throw th;
                }
            }
        });
        return new MongoDbOffsetContext(new SourceInfo(mongoDbConnectorConfig), new TransactionContext(), linkedHashMap);
    }

    static {
        $assertionsDisabled = !MongoDbStreamingChangeEventSource.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    }
}
