/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

public abstract class AbstractTask
implements Task {
    private static final long NO_DEADLINE = -1L;
    private Task.State state = Task.State.CREATED;
    private long deadlineMs = -1L;
    protected Set<TopicPartition> inputPartitions;
    protected Map<TopicPartition, Long> offsetSnapshotSinceLastFlush = null;
    protected final TaskId id;
    protected final ProcessorTopology topology;
    protected final StateDirectory stateDirectory;
    protected final ProcessorStateManager stateMgr;
    private final long taskTimeoutMs;

    AbstractTask(TaskId id, ProcessorTopology topology, StateDirectory stateDirectory, ProcessorStateManager stateMgr, Set<TopicPartition> inputPartitions, long taskTimeoutMs) {
        this.id = id;
        this.stateMgr = stateMgr;
        this.topology = topology;
        this.inputPartitions = inputPartitions;
        this.stateDirectory = stateDirectory;
        this.taskTimeoutMs = taskTimeoutMs;
    }

    protected void maybeWriteCheckpoint(boolean enforceCheckpoint) {
        Map<TopicPartition, Long> offsetSnapshot = this.stateMgr.changelogOffsets();
        if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, this.offsetSnapshotSinceLastFlush, offsetSnapshot)) {
            this.stateMgr.flush();
            this.stateMgr.checkpoint();
            this.offsetSnapshotSinceLastFlush = new HashMap<TopicPartition, Long>(offsetSnapshot);
        }
    }

    @Override
    public TaskId id() {
        return this.id;
    }

    @Override
    public Set<TopicPartition> inputPartitions() {
        return this.inputPartitions;
    }

    @Override
    public Collection<TopicPartition> changelogPartitions() {
        return this.stateMgr.changelogPartitions();
    }

    @Override
    public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
        this.stateMgr.markChangelogAsCorrupted(partitions);
    }

    @Override
    public StateStore getStore(String name) {
        return this.stateMgr.getStore(name);
    }

    @Override
    public boolean isClosed() {
        return this.state() == Task.State.CLOSED;
    }

    @Override
    public final Task.State state() {
        return this.state;
    }

    @Override
    public void revive() {
        if (this.state != Task.State.CLOSED) {
            throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while reviving task " + this.id);
        }
        this.transitionTo(Task.State.CREATED);
    }

    final void transitionTo(Task.State newState) {
        Task.State oldState = this.state();
        if (!oldState.isValidTransition(newState)) {
            throw new IllegalStateException("Invalid transition from " + (Object)((Object)oldState) + " to " + (Object)((Object)newState));
        }
        this.state = newState;
    }

    @Override
    public void update(Set<TopicPartition> topicPartitions, Map<String, List<String>> allTopologyNodesToSourceTopics) {
        this.inputPartitions = topicPartitions;
        this.topology.updateSourceTopics(allTopologyNodesToSourceTopics);
    }

    void maybeInitTaskTimeoutOrThrow(long currentWallClockMs, TimeoutException timeoutException, Logger log) throws StreamsException {
        if (this.deadlineMs == -1L) {
            this.deadlineMs = currentWallClockMs + this.taskTimeoutMs;
        } else if (currentWallClockMs > this.deadlineMs) {
            String errorMessage = String.format("Task %s did not make progress within %d ms. Adjust `%s` if needed.", this.id, currentWallClockMs - this.deadlineMs + this.taskTimeoutMs, "task.timeout.ms");
            if (timeoutException != null) {
                throw new TimeoutException(errorMessage, (Throwable)timeoutException);
            }
            throw new TimeoutException(errorMessage);
        }
        if (timeoutException != null) {
            log.debug("Timeout exception. Remaining time to deadline {}; retrying.", (Object)(this.deadlineMs - currentWallClockMs), (Object)timeoutException);
        } else {
            log.debug("Task did not make progress. Remaining time to deadline {}; retrying.", (Object)(this.deadlineMs - currentWallClockMs));
        }
    }

    void clearTaskTimeout(Logger log) {
        if (this.deadlineMs != -1L) {
            log.debug("Clearing task timeout.");
            this.deadlineMs = -1L;
        }
    }
}

