/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.graphdb.database.management;

import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.ReadBuffer;
import org.janusgraph.diskstorage.ResourceUnavailableException;
import org.janusgraph.diskstorage.log.Log;
import org.janusgraph.diskstorage.log.Message;
import org.janusgraph.diskstorage.log.MessageReader;
import org.janusgraph.diskstorage.util.time.Timer;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.cache.SchemaCache;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.database.management.MgmtLogType;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.janusgraph.graphdb.database.serialize.Serializer;
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagementLogger
implements MessageReader {
    private static final Logger log = LoggerFactory.getLogger(ManagementLogger.class);
    private static final Duration SLEEP_INTERVAL = Duration.ofMillis(100L);
    private static final Duration MAX_WAIT_TIME = Duration.ofSeconds(60L);
    private final StandardJanusGraph graph;
    private final SchemaCache schemaCache;
    private final Log sysLog;
    private final TimestampProvider times;
    private final AtomicInteger evictionTriggerCounter = new AtomicInteger(0);
    private final ConcurrentMap<Long, EvictionTrigger> evictionTriggerMap = new ConcurrentHashMap<Long, EvictionTrigger>();

    public ManagementLogger(StandardJanusGraph graph, Log sysLog, SchemaCache schemaCache, TimestampProvider times) {
        this.graph = graph;
        this.schemaCache = schemaCache;
        this.sysLog = sysLog;
        this.times = times;
        Preconditions.checkNotNull((Object)times);
    }

    @Override
    public void read(Message message) {
        ReadBuffer in = message.getContent().asReadBuffer();
        String senderId = message.getSenderId();
        Serializer serializer = this.graph.getDataSerializer();
        MgmtLogType logType = serializer.readObjectNotNull(in, MgmtLogType.class);
        Preconditions.checkNotNull((Object)((Object)logType));
        if (logType == MgmtLogType.CACHED_TYPE_EVICTION) {
            long evictionId = VariableLong.readPositive(in);
            long numEvictions = VariableLong.readPositive(in);
            int i = 0;
            while ((long)i < numEvictions) {
                long typeId = VariableLong.readPositive(in);
                this.schemaCache.expireSchemaElement(typeId);
                ++i;
            }
            Thread ack = new Thread(new SendAckOnTxClose(evictionId, senderId, this.graph.getOpenTransactions()));
            ack.setDaemon(true);
            ack.start();
        } else if (logType == MgmtLogType.CACHED_TYPE_EVICTION_ACK) {
            String receiverId = serializer.readObjectNotNull(in, String.class);
            long evictionId = VariableLong.readPositive(in);
            if (receiverId.equals(this.graph.getConfiguration().getUniqueGraphId())) {
                EvictionTrigger evictTrigger = (EvictionTrigger)this.evictionTriggerMap.get(evictionId);
                if (evictTrigger != null) {
                    evictTrigger.receivedAcknowledgement(senderId);
                } else {
                    log.error("Could not find eviction trigger for {} from {}", (Object)evictionId, (Object)senderId);
                }
            }
        } else assert (logType == MgmtLogType.CONFIG_MUTATION);
    }

    public void sendCacheEviction(Set<JanusGraphSchemaVertex> updatedTypes, List<Callable<Boolean>> updatedTypeTriggers, Set<String> openInstances) {
        Preconditions.checkArgument((!openInstances.isEmpty() ? 1 : 0) != 0);
        long evictionId = this.evictionTriggerCounter.incrementAndGet();
        this.evictionTriggerMap.put(evictionId, new EvictionTrigger(evictionId, updatedTypeTriggers, this.graph));
        DataOutput out = this.graph.getDataSerializer().getDataOutput(128);
        out.writeObjectNotNull((Object)MgmtLogType.CACHED_TYPE_EVICTION);
        VariableLong.writePositive(out, evictionId);
        VariableLong.writePositive(out, updatedTypes.size());
        for (JanusGraphSchemaVertex type : updatedTypes) {
            assert (type.hasId());
            VariableLong.writePositive(out, type.longId());
        }
        this.sysLog.add(out.getStaticBuffer());
    }

    @Override
    public void updateState() {
        this.evictionTriggerMap.forEach((k, v) -> {
            int ackCounter = v.removeDroppedInstances();
            if (ackCounter == 0) {
                v.runTriggers();
            }
        });
    }

    private class SendAckOnTxClose
    implements Runnable {
        private final long evictionId;
        private final Set<? extends JanusGraphTransaction> openTx;
        private final String originId;

        private SendAckOnTxClose(long evictionId, String originId, Set<? extends JanusGraphTransaction> openTx) {
            this.evictionId = evictionId;
            this.openTx = openTx;
            this.originId = originId;
        }

        @Override
        public void run() {
            Timer t = ManagementLogger.this.times.getTimer().start();
            while (true) {
                boolean txStillOpen = false;
                Iterator<? extends JanusGraphTransaction> iter = this.openTx.iterator();
                while (iter.hasNext()) {
                    if (iter.next().isClosed()) {
                        iter.remove();
                        continue;
                    }
                    txStillOpen = true;
                }
                if (!txStillOpen) {
                    DataOutput out = ManagementLogger.this.graph.getDataSerializer().getDataOutput(64);
                    out.writeObjectNotNull((Object)MgmtLogType.CACHED_TYPE_EVICTION_ACK);
                    out.writeObjectNotNull(this.originId);
                    VariableLong.writePositive(out, this.evictionId);
                    try {
                        ManagementLogger.this.sysLog.add(out.getStaticBuffer());
                        log.debug("Sent {}: evictionID={} originID={}", new Object[]{MgmtLogType.CACHED_TYPE_EVICTION_ACK, this.evictionId, this.originId});
                    }
                    catch (ResourceUnavailableException e) {
                        log.warn("System log has already shut down. Did not sent {}: evictionID={} originID={}", new Object[]{MgmtLogType.CACHED_TYPE_EVICTION_ACK, this.evictionId, this.originId});
                    }
                    break;
                }
                if (MAX_WAIT_TIME.compareTo(t.elapsed()) < 0) {
                    log.error("Evicted [{}] from cache but waiting too long for transactions to close. Stale transaction alert on: {}", (Object)this.getId(), this.openTx);
                    break;
                }
                try {
                    ManagementLogger.this.times.sleepPast(ManagementLogger.this.times.getTime().plus(SLEEP_INTERVAL));
                }
                catch (InterruptedException e) {
                    log.error("Interrupted eviction ack thread for " + this.getId(), (Throwable)e);
                    break;
                }
            }
        }

        public String getId() {
            return this.evictionId + "@" + this.originId;
        }
    }

    private class EvictionTrigger {
        final long evictionId;
        final List<Callable<Boolean>> updatedTypeTriggers;
        final StandardJanusGraph graph;
        final Set<String> instancesToBeAcknowledged;

        private EvictionTrigger(long evictionId, List<Callable<Boolean>> updatedTypeTriggers, StandardJanusGraph graph) {
            this.graph = graph;
            this.evictionId = evictionId;
            this.updatedTypeTriggers = updatedTypeTriggers;
            JanusGraphManagement mgmt = graph.openManagement();
            this.instancesToBeAcknowledged = ConcurrentHashMap.newKeySet();
            ((ManagementSystem)mgmt).getOpenInstancesInternal().forEach(this.instancesToBeAcknowledged::add);
            mgmt.rollback();
        }

        void receivedAcknowledgement(String senderId) {
            if (this.instancesToBeAcknowledged.remove(senderId)) {
                int ackCounter = this.instancesToBeAcknowledged.size();
                log.debug("Received acknowledgement for eviction [{}] from senderID={} ({} more acks still outstanding)", new Object[]{this.evictionId, senderId, ackCounter});
                if (ackCounter == 0) {
                    this.runTriggers();
                }
            }
        }

        void runTriggers() {
            for (Callable<Boolean> trigger : this.updatedTypeTriggers) {
                try {
                    boolean success = trigger.call();
                    assert (success);
                }
                catch (Throwable e) {
                    log.error("Could not execute trigger [" + trigger.toString() + "] for eviction [" + this.evictionId + "]", e);
                }
            }
            log.info("Received all acknowledgements for eviction [{}]", (Object)this.evictionId);
            ManagementLogger.this.evictionTriggerMap.remove(this.evictionId, this);
        }

        int removeDroppedInstances() {
            JanusGraphManagement mgmt = this.graph.openManagement();
            Set<String> updatedInstances = ((ManagementSystem)mgmt).getOpenInstancesInternal();
            String instanceRemovedMsg = "Instance [{}] was removed list of open instances and therefore dropped from list of instances to be acknowledged.";
            this.instancesToBeAcknowledged.stream().filter(it -> !updatedInstances.contains(it)).filter(this.instancesToBeAcknowledged::remove).forEach(it -> log.debug("Instance [{}] was removed list of open instances and therefore dropped from list of instances to be acknowledged.", it));
            mgmt.rollback();
            return this.instancesToBeAcknowledged.size();
        }
    }
}

