package org.springframework.integration.debezium.inbound;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.debezium.support.DebeziumHeaders;
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.HeaderMapper;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/debezium/inbound/DebeziumMessageProducer.class */
public class DebeziumMessageProducer extends MessageProducerSupport {
    private final DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder;
    private DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine;
    private TaskExecutor taskExecutor;
    private String contentType = "application/json";
    private HeaderMapper<List<Header<Object>>> headerMapper = new DefaultDebeziumHeaderMapper();
    private boolean enableEmptyPayload = false;
    private boolean enableBatch = false;
    private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0);

    /* loaded from: input_file:org/springframework/integration/debezium/inbound/DebeziumMessageProducer$BatchChangeEventConsumer.class */
    final class BatchChangeEventConsumer<T> implements DebeziumEngine.ChangeConsumer<ChangeEvent<T, T>> {
        BatchChangeEventConsumer() {
        }

        public void handleBatch(List<ChangeEvent<T, T>> list, DebeziumEngine.RecordCommitter<ChangeEvent<T, T>> recordCommitter) throws InterruptedException {
            DebeziumMessageProducer.this.sendMessage(DebeziumMessageProducer.this.getMessageBuilderFactory().withPayload(list).build());
            Iterator<ChangeEvent<T, T>> it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed(it.next());
            }
            recordCommitter.markBatchFinished();
        }
    }

    /* loaded from: input_file:org/springframework/integration/debezium/inbound/DebeziumMessageProducer$StreamChangeEventConsumer.class */
    final class StreamChangeEventConsumer<T> implements Consumer<ChangeEvent<T, T>> {
        StreamChangeEventConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(ChangeEvent<T, T> changeEvent) {
            Message<?> message = DebeziumMessageProducer.this.toMessage(changeEvent);
            if (message != null) {
                DebeziumMessageProducer.this.sendMessage(message);
            }
        }
    }

    public DebeziumMessageProducer(DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> builder) {
        Assert.notNull(builder, "'debeziumBuilder' must not be null");
        this.debeziumEngineBuilder = builder;
    }

    public void setEnableBatch(boolean z) {
        this.enableBatch = z;
    }

    public void setEnableEmptyPayload(boolean z) {
        this.enableEmptyPayload = z;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull(taskExecutor, "'taskExecutor' must not be null");
        this.taskExecutor = taskExecutor;
    }

    public void setContentType(String str) {
        Assert.hasText(str, "'contentType' must not be empty");
        this.contentType = str;
    }

    public void setHeaderMapper(HeaderMapper<List<Header<Object>>> headerMapper) {
        Assert.notNull(headerMapper, "'headerMapper' must not be null.");
        this.headerMapper = headerMapper;
    }

    public String getComponentType() {
        return "debezium:inbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        if (this.taskExecutor == null) {
            this.taskExecutor = new SimpleAsyncTaskExecutor(getComponentName() + "-thread-");
        }
        if (this.enableBatch) {
            this.debeziumEngineBuilder.notifying(new BatchChangeEventConsumer());
        } else {
            this.debeziumEngineBuilder.notifying(new StreamChangeEventConsumer());
        }
        this.debeziumEngine = this.debeziumEngineBuilder.build();
    }

    protected void doStart() {
        if (this.lifecycleLatch.getCount() > 0) {
            return;
        }
        this.lifecycleLatch = new CountDownLatch(1);
        this.taskExecutor.execute(() -> {
            try {
                this.debeziumEngine.run();
            } finally {
                this.lifecycleLatch.countDown();
            }
        });
    }

    protected void doStop() {
        try {
            this.debeziumEngine.close();
        } catch (IOException e) {
            this.logger.warn(e, "Debezium failed to close!");
        }
        try {
            if (this.lifecycleLatch.await(5L, TimeUnit.SECONDS)) {
            } else {
                throw new IllegalStateException("Failed to stop " + this);
            }
        } catch (InterruptedException e2) {
        }
    }

    @Nullable
    private <T> Message<?> toMessage(ChangeEvent<T, T> changeEvent) {
        Object key = changeEvent.key();
        Object value = changeEvent.value();
        String destination = changeEvent.destination();
        if (value == null && this.enableEmptyPayload) {
            value = Optional.empty();
        }
        if (value != null) {
            return getMessageBuilderFactory().withPayload(value).setHeader(DebeziumHeaders.KEY, key).setHeader(DebeziumHeaders.DESTINATION, destination).setHeader("contentType", this.contentType).copyHeaders(this.headerMapper.toHeaders(changeEvent.headers())).build();
        }
        this.logger.info(() -> {
            return "Dropped null payload message for Change Event key: " + key;
        });
        return null;
    }
}
