package org.apache.flink.runtime.state.changelog;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.class */
public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(LocalChangelogRegistry.class);
    private final Map<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>> handleToLastUsedCheckpointID = new ConcurrentHashMap();
    private final Executor asyncDisposalExecutor;

    public LocalChangelogRegistryImpl(Executor executor) {
        this.asyncDisposalExecutor = executor;
    }

    @Override // org.apache.flink.runtime.state.changelog.LocalChangelogRegistry
    public void register(StreamStateHandle streamStateHandle, long j) {
        this.handleToLastUsedCheckpointID.compute(streamStateHandle.getStreamStateHandleID(), (physicalStateHandleID, tuple2) -> {
            if (tuple2 == null) {
                return Tuple2.of(streamStateHandle, Long.valueOf(j));
            }
            Preconditions.checkState(streamStateHandle.equals(tuple2.f0));
            return Tuple2.of(streamStateHandle, Long.valueOf(Math.max(((Long) tuple2.f1).longValue(), j)));
        });
    }

    @Override // org.apache.flink.runtime.state.changelog.LocalChangelogRegistry
    public void discardUpToCheckpoint(long j) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.handleToLastUsedCheckpointID) {
            Iterator<Tuple2<StreamStateHandle, Long>> it = this.handleToLastUsedCheckpointID.values().iterator();
            while (it.hasNext()) {
                Tuple2<StreamStateHandle, Long> next = it.next();
                if (((Long) next.f1).longValue() < j) {
                    arrayList.add(next.f0);
                    it.remove();
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            scheduleAsyncDelete((StreamStateHandle) it2.next());
        }
    }

    @Override // org.apache.flink.runtime.state.changelog.LocalChangelogRegistry
    public void prune(long j) {
        Iterator it = ((Set) this.handleToLastUsedCheckpointID.values().stream().filter(tuple2 -> {
            return ((Long) tuple2.f1).longValue() == j;
        }).map(tuple22 -> {
            return (StreamStateHandle) tuple22.f0;
        }).collect(Collectors.toSet())).iterator();
        while (it.hasNext()) {
            scheduleAsyncDelete((StreamStateHandle) it.next());
        }
    }

    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
        if (streamStateHandle != null) {
            LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
            Runnable runnable = () -> {
                try {
                    streamStateHandle.discardState();
                } catch (Exception e) {
                    LOG.warn("A problem occurred during asynchronous disposal of a stream handle {}.", streamStateHandle);
                }
            };
            try {
                this.asyncDisposalExecutor.execute(runnable);
            } catch (RejectedExecutionException e) {
                runnable.run();
            }
        }
    }
}
