/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mongodb.inbound;

import org.bson.Document;
import org.reactivestreams.Publisher;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

public class MongoDbChangeStreamMessageProducer
extends MessageProducerSupport {
    private final ReactiveMongoOperations mongoOperations;
    private Class<?> domainType = Document.class;
    @Nullable
    private String collection;
    private ChangeStreamOptions options = ChangeStreamOptions.empty();
    private boolean extractBody = true;

    public MongoDbChangeStreamMessageProducer(ReactiveMongoOperations mongoOperations) {
        Assert.notNull((Object)mongoOperations, (String)"'mongoOperations' must not be null");
        this.mongoOperations = mongoOperations;
    }

    public void setDomainType(Class<?> domainType) {
        Assert.notNull(domainType, (String)"'domainType' must not be null");
        this.domainType = domainType;
    }

    public void setCollection(String collection) {
        this.collection = collection;
    }

    public void setOptions(ChangeStreamOptions options) {
        Assert.notNull((Object)options, (String)"'options' must not be null");
        this.options = options;
    }

    public void setExtractBody(boolean extractBody) {
        this.extractBody = extractBody;
    }

    public String getComponentType() {
        return "mongo:change-stream-inbound-channel-adapter";
    }

    protected void doStart() {
        Flux changeStreamFlux = this.mongoOperations.changeStream(this.collection, this.options, this.domainType).map(event -> MessageBuilder.withPayload((Object)(!this.extractBody || event.getBody() == null ? event : event.getBody())).setHeader("mongo_collectionName", (Object)event.getCollectionName()).setHeader("mongo_changeStream_operationType", (Object)event.getOperationType()).setHeader("mongo_changeStream_timestamp", (Object)event.getTimestamp()).setHeader("mongo_changeStream_resumeToken", (Object)event.getResumeToken()).build());
        this.subscribeToPublisher((Publisher)changeStreamFlux);
    }
}

