/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.slf4j.Logger;

public class StreamsRebalanceListener
implements ConsumerRebalanceListener {
    private final Time time;
    private final TaskManager taskManager;
    private final StreamThread streamThread;
    private final Logger log;

    StreamsRebalanceListener(Time time, TaskManager taskManager, StreamThread streamThread, Logger log) {
        this.time = time;
        this.taskManager = taskManager;
        this.streamThread = streamThread;
        this.log = log;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionsAssigned(Collection<TopicPartition> assignedPartitions) {
        long start;
        block8: {
            this.log.debug("Current state {}: assigned partitions {} at the end of consumer rebalance.\n\tpreviously assigned active tasks: {}\n\tpreviously assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), assignedPartitions, this.taskManager.previousActiveTaskIds(), this.taskManager.previousStandbyTaskIds()});
            if (this.streamThread.getAssignmentErrorCode() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
                this.log.error("Received error code {} - shutdown", (Object)this.streamThread.getAssignmentErrorCode());
                this.streamThread.shutdown();
                return;
            }
            start = this.time.milliseconds();
            List<TopicPartition> revokedStandbyPartitions = null;
            try {
                if (this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED) == null) {
                    this.log.debug("Skipping task creation in rebalance because we are already in {} state.", (Object)this.streamThread.state());
                } else {
                    revokedStandbyPartitions = this.taskManager.closeRevokedStandbyTasks();
                    this.taskManager.closeRevokedSuspendedTasks();
                    this.taskManager.createTasks(assignedPartitions);
                }
                if (revokedStandbyPartitions == null) break block8;
                this.streamThread.clearStandbyRecords(revokedStandbyPartitions);
            }
            catch (Throwable t) {
                block9: {
                    try {
                        this.log.error("Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance", t);
                        this.streamThread.setRebalanceException(t);
                        if (revokedStandbyPartitions == null) break block9;
                        this.streamThread.clearStandbyRecords(revokedStandbyPartitions);
                    }
                    catch (Throwable throwable) {
                        if (revokedStandbyPartitions != null) {
                            this.streamThread.clearStandbyRecords(revokedStandbyPartitions);
                        }
                        this.log.info("partition assignment took {} ms.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n\trevoked active tasks: {}\n\trevoked standby tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.revokedActiveTaskIds(), this.taskManager.revokedStandbyTaskIds()});
                        throw throwable;
                    }
                }
                this.log.info("partition assignment took {} ms.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n\trevoked active tasks: {}\n\trevoked standby tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.revokedActiveTaskIds(), this.taskManager.revokedStandbyTaskIds()});
            }
        }
        this.log.info("partition assignment took {} ms.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n\trevoked active tasks: {}\n\trevoked standby tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.revokedActiveTaskIds(), this.taskManager.revokedStandbyTaskIds()});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionsRevoked(Collection<TopicPartition> revokedPartitions) {
        this.log.debug("Current state {}: revoked partitions {} because of consumer rebalance.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), revokedPartitions, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
        Set<Object> suspendedTasks = new HashSet();
        if (this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED) != null && !revokedPartitions.isEmpty()) {
            long start = this.time.milliseconds();
            try {
                suspendedTasks = this.taskManager.suspendActiveTasksAndState(revokedPartitions);
            }
            catch (Throwable t) {
                this.log.error("Error caught during partition revocation, will abort the current process and re-throw at the end of rebalance: ", t);
                this.streamThread.setRebalanceException(t);
            }
            finally {
                this.log.info("partition revocation took {} ms.\n\tcurrent suspended active tasks: {}\n", (Object)(this.time.milliseconds() - start), suspendedTasks);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionsLost(Collection<TopicPartition> lostPartitions) {
        this.log.info("at state {}: partitions {} lost due to missed rebalance.\n\tlost active tasks: {}\n\tlost assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), lostPartitions, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
        Set<Object> lostTasks = new HashSet();
        long start = this.time.milliseconds();
        try {
            lostTasks = this.taskManager.closeLostTasks();
        }
        catch (Throwable t) {
            this.log.error("Error caught during partitions lost, will abort the current process and re-throw at the end of rebalance: ", t);
            this.streamThread.setRebalanceException(t);
        }
        finally {
            this.log.info("partitions lost took {} ms.\n\tclosed lost active tasks: {}\n", (Object)(this.time.milliseconds() - start), lostTasks);
        }
    }
}

