package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractTask.class */
public abstract class AbstractTask {
    protected final TaskId id;
    protected final String jobId;
    protected final ProcessorTopology topology;
    protected final Consumer consumer;
    protected final ProcessorStateManager stateMgr;
    protected final Set<TopicPartition> partitions;
    protected ProcessorContext processorContext;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTask(TaskId taskId, String str, Collection<TopicPartition> collection, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> consumer2, StreamsConfig streamsConfig, boolean z) {
        this.id = taskId;
        this.jobId = str;
        this.partitions = new HashSet(collection);
        this.topology = processorTopology;
        this.consumer = consumer;
        try {
            this.stateMgr = new ProcessorStateManager(str, taskId.partition, collection, new File(StreamThread.makeStateDir(str, streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG)).getCanonicalPath(), taskId.toString()), consumer2, z);
        } catch (IOException e) {
            throw new ProcessorStateException("Error while creating the state manager", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeStateStores() {
        initializeOffsetLimits();
        Iterator<StateStoreSupplier> it = this.topology.stateStoreSuppliers().iterator();
        while (it.hasNext()) {
            StateStore stateStore = it.next().get();
            stateStore.init(this.processorContext, stateStore);
        }
    }

    public final TaskId id() {
        return this.id;
    }

    public final String jobId() {
        return this.jobId;
    }

    public final Set<TopicPartition> partitions() {
        return this.partitions;
    }

    public final ProcessorTopology topology() {
        return this.topology;
    }

    public final ProcessorContext context() {
        return this.processorContext;
    }

    public abstract void commit();

    public void close() {
        try {
            this.stateMgr.close(recordCollectorOffsets());
        } catch (IOException e) {
            throw new ProcessorStateException("Error while closing the state manager", e);
        }
    }

    protected Map<TopicPartition, Long> recordCollectorOffsets() {
        return Collections.emptyMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeOffsetLimits() {
        for (TopicPartition topicPartition : this.partitions) {
            OffsetAndMetadata committed = this.consumer.committed(topicPartition);
            this.stateMgr.putOffsetLimit(topicPartition, committed != null ? committed.offset() : 0L);
        }
    }
}
