/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

public abstract class AbstractTask
implements Task {
    final TaskId id;
    final String applicationId;
    final ProcessorTopology topology;
    final ProcessorStateManager stateMgr;
    final Set<TopicPartition> partitions;
    final Consumer<byte[], byte[]> consumer;
    final String logPrefix;
    final boolean eosEnabled;
    final Logger log;
    final LogContext logContext;
    final StateDirectory stateDirectory;
    boolean taskInitialized;
    boolean taskClosed;
    boolean commitNeeded;
    InternalProcessorContext processorContext;

    AbstractTask(TaskId id, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, boolean isStandby, StateDirectory stateDirectory, StreamsConfig config) {
        this.id = id;
        this.applicationId = config.getString("application.id");
        this.partitions = new HashSet<TopicPartition>(partitions);
        this.topology = topology;
        this.consumer = consumer;
        this.eosEnabled = "exactly_once".equals(config.getString("processing.guarantee"));
        this.stateDirectory = stateDirectory;
        this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task" : "task", id);
        this.logContext = new LogContext(this.logPrefix);
        this.log = this.logContext.logger(this.getClass());
        try {
            this.stateMgr = new ProcessorStateManager(id, partitions, isStandby, stateDirectory, topology.storeToChangelogTopic(), changelogReader, this.eosEnabled, this.logContext);
        }
        catch (IOException e) {
            throw new ProcessorStateException(String.format("%sError while creating the state manager", this.logPrefix), e);
        }
    }

    @Override
    public TaskId id() {
        return this.id;
    }

    @Override
    public String applicationId() {
        return this.applicationId;
    }

    @Override
    public Set<TopicPartition> partitions() {
        return this.partitions;
    }

    @Override
    public ProcessorTopology topology() {
        return this.topology;
    }

    @Override
    public ProcessorContext context() {
        return this.processorContext;
    }

    @Override
    public StateStore getStore(String name) {
        return this.stateMgr.getStore(name);
    }

    public String toString() {
        return this.toString("");
    }

    public boolean isEosEnabled() {
        return this.eosEnabled;
    }

    @Override
    public String toString(String indent) {
        StringBuilder sb = new StringBuilder();
        sb.append(indent);
        sb.append("TaskId: ");
        sb.append(this.id);
        sb.append("\n");
        if (this.topology != null) {
            sb.append(indent).append(this.topology.toString(indent + "\t"));
        }
        if (this.partitions != null && !this.partitions.isEmpty()) {
            sb.append(indent).append("Partitions [");
            for (TopicPartition topicPartition : this.partitions) {
                sb.append(topicPartition.toString()).append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
        return Collections.emptyMap();
    }

    protected void updateOffsetLimits() {
        for (TopicPartition partition : this.partitions) {
            try {
                OffsetAndMetadata metadata = this.consumer.committed(partition);
                long offset = metadata != null ? metadata.offset() : 0L;
                this.stateMgr.putOffsetLimit(partition, offset);
                if (!this.log.isTraceEnabled()) continue;
                this.log.trace("Updating store offset limits {} for changelog {}", (Object)offset, (Object)partition);
            }
            catch (AuthorizationException e) {
                throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", this.id, partition), e);
            }
            catch (WakeupException e) {
                throw e;
            }
            catch (KafkaException e) {
                throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", this.id, partition), e);
            }
        }
    }

    void flushState() {
        this.stateMgr.flush();
    }

    void registerStateStores() {
        if (this.topology.stateStores().isEmpty()) {
            return;
        }
        try {
            if (!this.stateDirectory.lock(this.id)) {
                throw new LockException(String.format("%sFailed to lock the state directory for task %s", this.logPrefix, this.id));
            }
        }
        catch (IOException e) {
            throw new StreamsException(String.format("%sFatal error while trying to lock the state directory for task %s", this.logPrefix, this.id));
        }
        this.log.trace("Initializing state stores");
        this.updateOffsetLimits();
        for (StateStore store : this.topology.stateStores()) {
            this.log.trace("Initializing store {}", (Object)store.name());
            this.processorContext.uninitialize();
            store.init(this.processorContext, store);
        }
    }

    void reinitializeStateStoresForPartitions(Collection<TopicPartition> partitions) {
        this.stateMgr.reinitializeStateStoresForPartitions(partitions, this.processorContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    void closeStateManager(boolean clean) throws ProcessorStateException {
        ProcessorStateException exception;
        block13: {
            exception = null;
            this.log.trace("Closing state manager");
            this.stateMgr.close(clean);
            try {
                this.stateDirectory.unlock(this.id);
            }
            catch (IOException e) {
                if (exception == null) {
                    exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", this.logPrefix), e);
                }
                break block13;
            }
            catch (ProcessorStateException e) {
                try {
                    exception = e;
                }
                catch (Throwable throwable) {
                    block14: {
                        try {
                            this.stateDirectory.unlock(this.id);
                        }
                        catch (IOException e2) {
                            if (exception != null) break block14;
                            exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", this.logPrefix), e2);
                        }
                    }
                    throw throwable;
                }
                try {
                    this.stateDirectory.unlock(this.id);
                }
                catch (IOException e3) {
                    if (exception == null) {
                        exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", this.logPrefix), e3);
                    }
                }
            }
        }
        if (exception != null) {
            throw exception;
        }
    }

    public boolean isClosed() {
        return this.taskClosed;
    }

    @Override
    public boolean commitNeeded() {
        return this.commitNeeded;
    }

    @Override
    public boolean hasStateStores() {
        return !this.topology.stateStores().isEmpty();
    }

    @Override
    public Collection<TopicPartition> changelogPartitions() {
        return this.stateMgr.changelogPartitions();
    }
}

