/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc.subscription.manager;

import com.github.msemys.esjc.ConnectionClosedException;
import com.github.msemys.esjc.Settings;
import com.github.msemys.esjc.SubscriptionDropReason;
import com.github.msemys.esjc.operation.manager.OperationTimeoutException;
import com.github.msemys.esjc.operation.manager.RetriesLimitReachedException;
import com.github.msemys.esjc.subscription.manager.SubscriptionItem;
import com.github.msemys.esjc.util.Iterables;
import com.github.msemys.esjc.util.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionManager {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionManager.class);
    private final Map<UUID, SubscriptionItem> activeSubscriptions = new ConcurrentHashMap<UUID, SubscriptionItem>();
    private final Queue<SubscriptionItem> waitingSubscriptions = new ConcurrentLinkedQueue<SubscriptionItem>();
    private final Queue<SubscriptionItem> retryPendingSubscriptions = new ConcurrentLinkedQueue<SubscriptionItem>();
    private final Settings settings;

    public SubscriptionManager(Settings settings) {
        Preconditions.checkNotNull(settings, "settings is null");
        this.settings = settings;
    }

    public Optional<SubscriptionItem> getActiveSubscription(UUID correlationId) {
        return Optional.ofNullable(this.activeSubscriptions.get(correlationId));
    }

    public void cleanUp(Throwable cause) {
        ConnectionClosedException exception = new ConnectionClosedException("Connection was closed.", cause);
        Consumer<SubscriptionItem> dropSubscription = item -> item.operation.drop(SubscriptionDropReason.ConnectionClosed, exception);
        Iterables.consume(this.activeSubscriptions.values(), dropSubscription);
        Iterables.consume(this.waitingSubscriptions, dropSubscription);
        Iterables.consume(this.retryPendingSubscriptions, dropSubscription);
    }

    public void purgeSubscribedAndDropped(ChannelId connectionId) {
        ArrayList subscriptionsToRemove = new ArrayList();
        this.activeSubscriptions.values().stream().filter(s -> s.isSubscribed && s.connectionId.equals(connectionId)).forEach(s -> {
            s.operation.connectionClosed();
            subscriptionsToRemove.add(s);
        });
        subscriptionsToRemove.forEach(s -> this.activeSubscriptions.remove(s.correlationId));
    }

    public void checkTimeoutsAndRetry(Channel connection) {
        Preconditions.checkNotNull(connection, "connection is null");
        ArrayList retrySubscriptions = new ArrayList();
        ArrayList removeSubscriptions = new ArrayList();
        this.activeSubscriptions.values().stream().filter(s -> !s.isSubscribed).forEach(s -> {
            if (!s.connectionId.equals(connection.id())) {
                retrySubscriptions.add(s);
            } else if (!s.timeout.isZero() && s.lastUpdated.isElapsed(this.settings.operationTimeout)) {
                String error = String.format("Subscription never got confirmation from server. UTC now: %s, operation: %s.", Instant.now(), s);
                logger.error(error);
                if (this.settings.failOnNoServerResponse) {
                    s.operation.drop(SubscriptionDropReason.SubscribingError, new OperationTimeoutException(error));
                    removeSubscriptions.add(s);
                } else {
                    retrySubscriptions.add(s);
                }
            }
        });
        retrySubscriptions.forEach(this::scheduleSubscriptionRetry);
        removeSubscriptions.forEach(this::removeSubscription);
        Iterables.consume(this.retryPendingSubscriptions, item -> {
            ++item.retryCount;
            this.startSubscription((SubscriptionItem)item, connection);
        });
        Iterables.consume(this.waitingSubscriptions, item -> this.startSubscription((SubscriptionItem)item, connection));
    }

    public boolean removeSubscription(SubscriptionItem item) {
        boolean removed = this.activeSubscriptions.remove(item.correlationId) != null;
        logger.debug("RemoveSubscription {}, result {}.", (Object)item, (Object)removed);
        return removed;
    }

    public void scheduleSubscriptionRetry(SubscriptionItem item) {
        if (!this.removeSubscription(item)) {
            logger.debug("RemoveSubscription failed when trying to retry {}.", (Object)item);
            return;
        }
        if (item.maxRetries >= 0 && item.retryCount >= item.maxRetries) {
            logger.debug("RETRIES LIMIT REACHED when trying to retry {}.", (Object)item);
            item.operation.drop(SubscriptionDropReason.SubscribingError, new RetriesLimitReachedException(item.toString(), item.retryCount));
            return;
        }
        logger.debug("retrying subscription {}.", (Object)item);
        this.retryPendingSubscriptions.add(item);
    }

    public void enqueueSubscription(SubscriptionItem item) {
        this.waitingSubscriptions.offer(item);
    }

    public void startSubscription(SubscriptionItem item, Channel connection) {
        Preconditions.checkNotNull(connection, "connection is null");
        if (item.isSubscribed) {
            logger.debug("StartSubscription REMOVING due to already subscribed {}.", (Object)item);
            this.removeSubscription(item);
            return;
        }
        item.correlationId = UUID.randomUUID();
        item.connectionId = connection.id();
        item.lastUpdated.update();
        this.activeSubscriptions.put(item.correlationId, item);
        if (!item.operation.subscribe(item.correlationId, connection)) {
            logger.debug("StartSubscription REMOVING AS COULD NOT SUBSCRIBE {}.", (Object)item);
            this.removeSubscription(item);
        } else {
            logger.debug("StartSubscription SUBSCRIBING {}.", (Object)item);
        }
    }
}

