package org.springframework.data.mongodb.core.messaging;

import com.mongodb.client.MongoCursor;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;
import org.springframework.data.mongodb.core.messaging.Task;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

/* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-3.4.2.jar:org/springframework/data/mongodb/core/messaging/CursorReadingTask.class */
abstract class CursorReadingTask<T, R> implements Task {
    private final MongoTemplate template;
    private final SubscriptionRequest<T, R, SubscriptionRequest.RequestOptions> request;
    private final Class<R> targetType;
    private final ErrorHandler errorHandler;
    private MongoCursor<T> cursor;
    private final Object lifecycleMonitor = new Object();
    private final CountDownLatch awaitStart = new CountDownLatch(1);
    private Task.State state = Task.State.CREATED;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public CursorReadingTask(MongoTemplate mongoTemplate, SubscriptionRequest<?, ? super T, ? extends SubscriptionRequest.RequestOptions> subscriptionRequest, Class<R> cls, ErrorHandler errorHandler) {
        this.template = mongoTemplate;
        this.request = subscriptionRequest;
        this.targetType = cls;
        this.errorHandler = errorHandler;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Runnable
    public void run() {
        try {
            start();
            while (isRunning()) {
                try {
                    Object execute = execute(this::getNext);
                    if (execute != null) {
                        emitMessage(createMessage(execute, this.targetType, this.request.getRequestOptions2()));
                    } else {
                        Thread.sleep(10L);
                    }
                } catch (InterruptedException e) {
                    synchronized (this.lifecycleMonitor) {
                        this.state = Task.State.CANCELLED;
                        Thread.currentThread().interrupt();
                    }
                }
            }
        } catch (RuntimeException e2) {
            synchronized (this.lifecycleMonitor) {
                this.state = Task.State.CANCELLED;
                this.errorHandler.handleError(e2);
            }
        }
    }

    private void start() {
        synchronized (this.lifecycleMonitor) {
            if (!Task.State.RUNNING.equals(this.state)) {
                this.state = Task.State.STARTING;
            }
        }
        do {
            boolean z = false;
            synchronized (this.lifecycleMonitor) {
                if (Task.State.STARTING.equals(this.state)) {
                    MongoCursor<T> mongoCursor = (MongoCursor) execute(() -> {
                        return initCursor(this.template, this.request.getRequestOptions2(), this.targetType);
                    });
                    z = isValidCursor(mongoCursor);
                    if (z) {
                        this.cursor = mongoCursor;
                        this.state = Task.State.RUNNING;
                    } else if (mongoCursor != null) {
                        mongoCursor.close();
                    }
                }
            }
            if (!z) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    synchronized (this.lifecycleMonitor) {
                        this.state = Task.State.CANCELLED;
                        Thread.currentThread().interrupt();
                    }
                }
            }
        } while (Task.State.STARTING.equals(getState()));
        if (this.awaitStart.getCount() == 1) {
            this.awaitStart.countDown();
        }
    }

    protected abstract MongoCursor<T> initCursor(MongoTemplate mongoTemplate, SubscriptionRequest.RequestOptions requestOptions, Class<?> cls);

    @Override // org.springframework.data.mongodb.core.messaging.Cancelable
    public void cancel() throws DataAccessResourceFailureException {
        synchronized (this.lifecycleMonitor) {
            if (Task.State.RUNNING.equals(this.state) || Task.State.STARTING.equals(this.state)) {
                this.state = Task.State.CANCELLED;
                if (this.cursor != null) {
                    this.cursor.close();
                }
            }
        }
    }

    @Override // org.springframework.scheduling.SchedulingAwareRunnable
    public boolean isLongLived() {
        return true;
    }

    @Override // org.springframework.data.mongodb.core.messaging.Task
    public Task.State getState() {
        Task.State state;
        synchronized (this.lifecycleMonitor) {
            state = this.state;
        }
        return state;
    }

    @Override // org.springframework.data.mongodb.core.messaging.Task
    public boolean awaitStart(Duration duration) throws InterruptedException {
        Assert.notNull(duration, "Timeout must not be null!");
        Assert.isTrue(!duration.isNegative(), "Timeout must not be negative!");
        return this.awaitStart.await(duration.toNanos(), TimeUnit.NANOSECONDS);
    }

    protected Message<T, R> createMessage(T t, Class<R> cls, SubscriptionRequest.RequestOptions requestOptions) {
        return new LazyMappingDelegatingMessage(new SimpleMessage(t, t, Message.MessageProperties.builder().databaseName(this.template.getDb().getName()).collectionName(requestOptions.getCollectionName()).build()), cls, this.template.getConverter());
    }

    private boolean isRunning() {
        return Task.State.RUNNING.equals(getState());
    }

    private void emitMessage(Message<T, R> message) {
        try {
            this.request.getMessageListener().onMessage(message);
        } catch (Exception e) {
            this.errorHandler.handleError(e);
        }
    }

    @Nullable
    private T getNext() {
        synchronized (this.lifecycleMonitor) {
            if (!Task.State.RUNNING.equals(this.state)) {
                throw new IllegalStateException(String.format("Cursor %s is not longer open.", this.cursor));
            }
            return this.cursor.tryNext();
        }
    }

    private static boolean isValidCursor(@Nullable MongoCursor<?> mongoCursor) {
        return (mongoCursor == null || mongoCursor.getServerCursor() == null || mongoCursor.getServerCursor().getId() == 0) ? false : true;
    }

    @Nullable
    private <V> V execute(Supplier<V> supplier) {
        try {
            return supplier.get();
        } catch (RuntimeException e) {
            DataAccessException translateExceptionIfPossible = this.template.getExceptionTranslator().translateExceptionIfPossible(e);
            if (translateExceptionIfPossible != null) {
                throw translateExceptionIfPossible;
            }
            throw e;
        }
    }
}
