/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshot;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

public class HeapInternalTimerService<K, N>
implements InternalTimerService<N>,
ProcessingTimeCallback {
    private final ProcessingTimeService processingTimeService;
    private final KeyContext keyContext;
    private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
    private final KeyGroupsList localKeyGroupRange;
    private final int totalKeyGroups;
    private final int localKeyGroupRangeStartIdx;
    private long currentWatermark = Long.MIN_VALUE;
    private ScheduledFuture<?> nextTimer;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<N> namespaceSerializer;
    private Triggerable<K, N> triggerTarget;
    private volatile boolean isInitialized;
    private TypeSerializer<K> keyDeserializer;
    private TypeSerializer<N> namespaceDeserializer;
    private InternalTimersSnapshot<K, N> restoredTimersSnapshot;

    public HeapInternalTimerService(int totalKeyGroups, KeyGroupsList localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService) {
        this.keyContext = (KeyContext)Preconditions.checkNotNull((Object)keyContext);
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.totalKeyGroups = totalKeyGroups;
        this.localKeyGroupRange = (KeyGroupsList)Preconditions.checkNotNull((Object)localKeyGroupRange);
        int startIdx = Integer.MAX_VALUE;
        for (Integer keyGroupIdx : localKeyGroupRange) {
            startIdx = Math.min(keyGroupIdx, startIdx);
        }
        this.localKeyGroupRangeStartIdx = startIdx;
        int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups();
        this.eventTimeTimersQueue = new PriorityQueue(100);
        this.eventTimeTimersByKeyGroup = new HashSet[localKeyGroups];
        this.processingTimeTimersQueue = new PriorityQueue(100);
        this.processingTimeTimersByKeyGroup = new HashSet[localKeyGroups];
    }

    public void startTimerService(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerTarget) {
        if (!this.isInitialized) {
            if (keySerializer == null || namespaceSerializer == null) {
                throw new IllegalArgumentException("The TimersService serializers cannot be null.");
            }
            if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
                throw new IllegalStateException("The TimerService has already been initialized.");
            }
            if (this.restoredTimersSnapshot != null) {
                CompatibilityResult keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(this.keyDeserializer, null, (TypeSerializerConfigSnapshot)this.restoredTimersSnapshot.getKeySerializerConfigSnapshot(), keySerializer);
                CompatibilityResult namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(this.namespaceDeserializer, null, (TypeSerializerConfigSnapshot)this.restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(), namespaceSerializer);
                if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
                    throw new IllegalStateException("Tried to initialize restored TimerService with incompatible serializers than those used to snapshot its state.");
                }
            }
            this.keySerializer = keySerializer;
            this.namespaceSerializer = namespaceSerializer;
            this.keyDeserializer = null;
            this.namespaceDeserializer = null;
            this.triggerTarget = (Triggerable)Preconditions.checkNotNull(triggerTarget);
            if (this.processingTimeTimersQueue.size() > 0) {
                this.nextTimer = this.processingTimeService.registerTimer(this.processingTimeTimersQueue.peek().getTimestamp(), this);
            }
            this.isInitialized = true;
        } else if (!this.keySerializer.equals(keySerializer) || !this.namespaceSerializer.equals(namespaceSerializer)) {
            throw new IllegalArgumentException("Already initialized Timer Service tried to be initialized with different key and namespace serializers.");
        }
    }

    @Override
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return this.currentWatermark;
    }

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getProcessingTimeTimerSetForTimer(timer);
        if (timerSet.add(timer)) {
            InternalTimer<K, N> oldHead = this.processingTimeTimersQueue.peek();
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            this.processingTimeTimersQueue.add(timer);
            if (time < nextTriggerTime) {
                if (this.nextTimer != null) {
                    this.nextTimer.cancel(false);
                }
                this.nextTimer = this.processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getEventTimeTimerSetForTimer(timer);
        if (timerSet.add(timer)) {
            this.eventTimeTimersQueue.add(timer);
        }
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getProcessingTimeTimerSetForTimer(timer);
        if (timerSet.remove(timer)) {
            this.processingTimeTimersQueue.remove(timer);
        }
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getEventTimeTimerSetForTimer(timer);
        if (timerSet.remove(timer)) {
            this.eventTimeTimersQueue.remove(timer);
        }
    }

    @Override
    public void onProcessingTime(long time) throws Exception {
        InternalTimer<K, N> timer;
        this.nextTimer = null;
        while ((timer = this.processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            Set<InternalTimer<K, N>> timerSet = this.getProcessingTimeTimerSetForTimer(timer);
            timerSet.remove(timer);
            this.processingTimeTimersQueue.remove();
            this.keyContext.setCurrentKey(timer.getKey());
            this.triggerTarget.onProcessingTime(timer);
        }
        if (timer != null && this.nextTimer == null) {
            this.nextTimer = this.processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

    public void advanceWatermark(long time) throws Exception {
        InternalTimer<K, N> timer;
        this.currentWatermark = time;
        while ((timer = this.eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            Set<InternalTimer<K, N>> timerSet = this.getEventTimeTimerSetForTimer(timer);
            timerSet.remove(timer);
            this.eventTimeTimersQueue.remove();
            this.keyContext.setCurrentKey(timer.getKey());
            this.triggerTarget.onEventTime(timer);
        }
    }

    public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
        return new InternalTimersSnapshot<K, N>(this.keySerializer, this.keySerializer.snapshotConfiguration(), this.namespaceSerializer, this.namespaceSerializer.snapshotConfiguration(), this.getEventTimeTimerSetForKeyGroup(keyGroupIdx), this.getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
    }

    public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredTimersSnapshot, int keyGroupIdx) throws IOException {
        this.restoredTimersSnapshot = restoredTimersSnapshot;
        if (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer()) || this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer())) {
            throw new IllegalArgumentException("Tried to restore timers for the same service with different serializers.");
        }
        this.keyDeserializer = this.restoredTimersSnapshot.getKeySerializer();
        this.namespaceDeserializer = this.restoredTimersSnapshot.getNamespaceSerializer();
        Preconditions.checkArgument((boolean)this.localKeyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
        Set<InternalTimer<K, N>> eventTimers = this.getEventTimeTimerSetForKeyGroup(keyGroupIdx);
        eventTimers.addAll(this.restoredTimersSnapshot.getEventTimeTimers());
        this.eventTimeTimersQueue.addAll(this.restoredTimersSnapshot.getEventTimeTimers());
        Set<InternalTimer<K, N>> processingTimers = this.getProcessingTimeTimerSetForKeyGroup(keyGroupIdx);
        processingTimers.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers());
        this.processingTimeTimersQueue.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers());
    }

    private Set<InternalTimer<K, N>> getEventTimeTimerSetForTimer(InternalTimer<K, N> timer) {
        Preconditions.checkArgument((this.localKeyGroupRange != null ? 1 : 0) != 0, (Object)"The operator has not been initialized.");
        int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), (int)this.totalKeyGroups);
        return this.getEventTimeTimerSetForKeyGroup(keyGroupIdx);
    }

    private Set<InternalTimer<K, N>> getEventTimeTimerSetForKeyGroup(int keyGroupIdx) {
        int localIdx = this.getIndexForKeyGroup(keyGroupIdx);
        Set<InternalTimer<K, N>> timers = this.eventTimeTimersByKeyGroup[localIdx];
        if (timers == null) {
            this.eventTimeTimersByKeyGroup[localIdx] = timers = new HashSet<InternalTimer<K, N>>();
        }
        return timers;
    }

    private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForTimer(InternalTimer<K, N> timer) {
        Preconditions.checkArgument((this.localKeyGroupRange != null ? 1 : 0) != 0, (Object)"The operator has not been initialized.");
        int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), (int)this.totalKeyGroups);
        return this.getProcessingTimeTimerSetForKeyGroup(keyGroupIdx);
    }

    private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForKeyGroup(int keyGroupIdx) {
        int localIdx = this.getIndexForKeyGroup(keyGroupIdx);
        Set<InternalTimer<K, N>> timers = this.processingTimeTimersByKeyGroup[localIdx];
        if (timers == null) {
            this.processingTimeTimersByKeyGroup[localIdx] = timers = new HashSet<InternalTimer<K, N>>();
        }
        return timers;
    }

    private int getIndexForKeyGroup(int keyGroupIdx) {
        Preconditions.checkArgument((boolean)this.localKeyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
        return keyGroupIdx - this.localKeyGroupRangeStartIdx;
    }

    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }

    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }

    public int numProcessingTimeTimers(N namespace) {
        int count = 0;
        for (InternalTimer<K, N> timer : this.processingTimeTimersQueue) {
            if (!timer.getNamespace().equals(namespace)) continue;
            ++count;
        }
        return count;
    }

    public int numEventTimeTimers(N namespace) {
        int count = 0;
        for (InternalTimer<K, N> timer : this.eventTimeTimersQueue) {
            if (!timer.getNamespace().equals(namespace)) continue;
            ++count;
        }
        return count;
    }

    @VisibleForTesting
    public int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }

    @VisibleForTesting
    public Set<InternalTimer<K, N>>[] getEventTimeTimersPerKeyGroup() {
        return this.eventTimeTimersByKeyGroup;
    }

    @VisibleForTesting
    public Set<InternalTimer<K, N>>[] getProcessingTimeTimersPerKeyGroup() {
        return this.processingTimeTimersByKeyGroup;
    }
}

