/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.Log;
import kafka.server.HostedPartition;
import kafka.server.InitialFetchState;
import kafka.server.RaftReplicaChangeDelegateHelper;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MetadataBroker;
import kafka.server.metadata.MetadataBrokers;
import kafka.server.metadata.MetadataPartition;
import kafka.utils.Implicits$MapExtensionMethods$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Set$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005}3AAB\u0004\u0001\u0019!A1\u0003\u0001B\u0001B\u0003%A\u0003C\u0003\u0019\u0001\u0011\u0005\u0011\u0004C\u0003\u001d\u0001\u0011\u0005Q\u0004C\u00038\u0001\u0011\u0005\u0001\bC\u0003U\u0001\u0011\u0005QKA\rSC\u001a$(+\u001a9mS\u000e\f7\t[1oO\u0016$U\r\\3hCR,'B\u0001\u0005\n\u0003\u0019\u0019XM\u001d<fe*\t!\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001i\u0001C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g-\u0001\u0004iK2\u0004XM\u001d\t\u0003+Yi\u0011aB\u0005\u0003/\u001d\u0011qDU1giJ+\u0007\u000f\\5dC\u000eC\u0017M\\4f\t\u0016dWmZ1uK\"+G\u000e]3s\u0003\u0019a\u0014N\\5u}Q\u0011!d\u0007\t\u0003+\u0001AQa\u0005\u0002A\u0002Q\tA\"\\1lK\u0012+g-\u001a:sK\u0012$2AH\u00113!\tqq$\u0003\u0002!\u001f\t!QK\\5u\u0011\u0015\u00113\u00011\u0001$\u0003A\u0001\u0018M\u001d;ji&|gn\u001d(fo6\u000b\u0007\u000f\u0005\u0003%O%zS\"A\u0013\u000b\u0005\u0019z\u0011AC2pY2,7\r^5p]&\u0011\u0001&\n\u0002\u0004\u001b\u0006\u0004\bC\u0001\u0016.\u001b\u0005Y#B\u0001\u0017\n\u0003\u001d\u0019G.^:uKJL!AL\u0016\u0003\u0013A\u000b'\u000f^5uS>t\u0007C\u0001\b1\u0013\t\ttBA\u0004C_>dW-\u00198\t\u000bM\u001a\u0001\u0019\u0001\u001b\u0002\u001d5,G/\u00193bi\u0006|eMZ:fiB\u0011a\"N\u0005\u0003m=\u0011A\u0001T8oO\u0006YQ.Y6f\u0019\u0016\fG-\u001a:t)\u0015ID(\u0012%Q!\r!#(K\u0005\u0003w\u0015\u00121aU3u\u0011\u0015iD\u00011\u0001?\u0003u\u0001(/\u001a<QCJ$\u0018\u000e^5p]N\fEN]3bIf,\u00050[:uS:<\u0007c\u0001\u0013;\u007fA\u0011\u0001iQ\u0007\u0002\u0003*\u0011!iB\u0001\t[\u0016$\u0018\rZ1uC&\u0011A)\u0011\u0002\u0012\u001b\u0016$\u0018\rZ1uCB\u000b'\u000f^5uS>t\u0007\"\u0002$\u0005\u0001\u00049\u0015a\u00049beRLG/[8o'R\fG/Z:\u0011\t\u0011:\u0013f\u0010\u0005\u0006\u0013\u0012\u0001\rAS\u0001\u0019Q&<\u0007nV1uKJl\u0017M]6DQ\u0016\u001c7\u000e]8j]R\u001c\bCA&O\u001b\u0005a%BA'\b\u0003-\u0019\u0007.Z2la>Lg\u000e^:\n\u0005=c%!E(gMN,Go\u00115fG.\u0004x.\u001b8ug\")1\u0007\u0002a\u0001#B\u0019aB\u0015\u001b\n\u0005M{!AB(qi&|g.A\u0007nC.,gi\u001c7m_^,'o\u001d\u000b\u0007sY;F,\u00180\t\u000bu*\u0001\u0019\u0001 \t\u000ba+\u0001\u0019A-\u0002\u001d\r,(O]3oi\n\u0013xn[3sgB\u0011\u0001IW\u0005\u00037\u0006\u0013q\"T3uC\u0012\fG/\u0019\"s_.,'o\u001d\u0005\u0006\r\u0016\u0001\ra\u0012\u0005\u0006\u0013\u0016\u0001\rA\u0013\u0005\u0006g\u0015\u0001\r!\u0015")
public class RaftReplicaChangeDelegate {
    private final RaftReplicaChangeDelegateHelper helper;

    public void makeDeferred(Map<Partition, Object> partitionsNewMap, long metadataOffset) {
        boolean traceLoggingEnabled = this.helper.stateChangeLogger().isTraceEnabled();
        if (traceLoggingEnabled) {
            Function2 & Serializable forKeyValue$extension_f = (Function2 & Serializable)(partition, isNew) -> {
                RaftReplicaChangeDelegate.$anonfun$makeDeferred$1(this, metadataOffset, partition, BoxesRunTime.unboxToBoolean((Object)isNew));
                return BoxedUnit.UNIT;
            };
            partitionsNewMap.foreachEntry((arg_0, arg_1) -> Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(forKeyValue$extension_f, arg_0, arg_1));
            Object var5_4 = null;
        }
        this.helper.replicaFetcherManager().removeFetcherForPartitions((Set<TopicPartition>)((Set)partitionsNewMap.keySet().map((Function1 & Serializable)x$1 -> x$1.topicPartition())));
        this.helper.stateChangeLogger().info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(89).append("Metadata batch ").append(metadataOffset).append(": as part of become-deferred request, ").append("stopped any fetchers for ").append(partitionsNewMap.size()).append(" partitions").toString());
        Function2 & Serializable forKeyValue$extension_f = (Function2 & Serializable)(partition, isNew) -> {
            this.helper.markDeferred(new HostedPartition.Deferred(partition, BoxesRunTime.unboxToBoolean((Object)isNew)));
            return BoxedUnit.UNIT;
        };
        partitionsNewMap.foreachEntry((arg_0, arg_1) -> Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(forKeyValue$extension_f, arg_0, arg_1));
        Object var6_5 = null;
        this.helper.replicaFetcherManager().shutdownIdleFetcherThreads();
        this.helper.replicaAlterLogDirsManager().shutdownIdleFetcherThreads();
        if (traceLoggingEnabled) {
            partitionsNewMap.keys().foreach((Function1 & Serializable)partition -> {
                RaftReplicaChangeDelegate.$anonfun$makeDeferred$6(this, metadataOffset, partition);
                return BoxedUnit.UNIT;
            });
            return;
        }
    }

    public Set<Partition> makeLeaders(Set<MetadataPartition> prevPartitionsAlreadyExisting, Map<Partition, MetadataPartition> partitionStates, OffsetCheckpoints highWatermarkCheckpoints, Option<Object> metadataOffset) {
        scala.collection.mutable.Set partitionsMadeLeaders = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        boolean traceLoggingEnabled = this.helper.stateChangeLogger().isTraceEnabled();
        boolean deferredBatches = metadataOffset.isEmpty();
        String topLevelLogPrefix = deferredBatches ? "Metadata batch <multiple deferred>" : new StringBuilder(15).append("Metadata batch ").append(metadataOffset.get()).toString();
        try {
            this.helper.replicaFetcherManager().removeFetcherForPartitions((Set<TopicPartition>)((Set)partitionStates.keySet().map((Function1 & Serializable)x$2 -> x$2.topicPartition())));
            this.helper.stateChangeLogger().info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(21).append(topLevelLogPrefix).append(": stopped ").append(partitionStates.size()).append(" fetcher(s)").toString());
            Function2 & Serializable forKeyValue$extension_f = (Function2 & Serializable)(partition, state) -> {
                RaftReplicaChangeDelegate.$anonfun$makeLeaders$3(this, deferredBatches, metadataOffset, prevPartitionsAlreadyExisting, highWatermarkCheckpoints, partitionsMadeLeaders, traceLoggingEnabled, partition, state);
                return BoxedUnit.UNIT;
            };
            partitionStates.foreachEntry((arg_0, arg_1) -> Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(forKeyValue$extension_f, arg_0, arg_1));
            Object var10_9 = null;
        }
        catch (Throwable e) {
            this.helper.stateChangeLogger().error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(31).append(topLevelLogPrefix).append(": error while processing batch.").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
            throw e;
        }
        return partitionsMadeLeaders;
    }

    public Set<Partition> makeFollowers(Set<MetadataPartition> prevPartitionsAlreadyExisting, MetadataBrokers currentBrokers, Map<Partition, MetadataPartition> partitionStates, OffsetCheckpoints highWatermarkCheckpoints, Option<Object> metadataOffset) {
        String topLevelLogPrefix;
        boolean traceLoggingEnabled = this.helper.stateChangeLogger().isTraceEnabled();
        boolean deferredBatches = metadataOffset.isEmpty();
        String string = topLevelLogPrefix = deferredBatches ? "Metadata batch <multiple deferred>" : new StringBuilder(15).append("Metadata batch ").append(metadataOffset.get()).toString();
        if (traceLoggingEnabled) {
            Function2 & Serializable forKeyValue$extension_f = (Function2 & Serializable)(partition, state) -> {
                RaftReplicaChangeDelegate.$anonfun$makeFollowers$1(this, deferredBatches, metadataOffset, partition, state);
                return BoxedUnit.UNIT;
            };
            partitionStates.foreachEntry((arg_0, arg_1) -> Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(forKeyValue$extension_f, arg_0, arg_1));
            Object var14_9 = null;
        }
        scala.collection.mutable.Set partitionsMadeFollower = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        scala.collection.immutable.Set acceptableLeaderBrokerIds = currentBrokers.iterator().map((Function1 & Serializable)broker -> BoxesRunTime.boxToInteger((int)broker.id())).toSet();
        scala.collection.immutable.Map allBrokersByIdMap = currentBrokers.iterator().map((Function1 & Serializable)broker -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)broker.id())), broker)).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        try {
            Function2 & Serializable forKeyValue$extension_f = (Function2 & Serializable)(partition, state) -> {
                RaftReplicaChangeDelegate.$anonfun$makeFollowers$5(this, deferredBatches, metadataOffset, prevPartitionsAlreadyExisting, acceptableLeaderBrokerIds, highWatermarkCheckpoints, partitionsMadeFollower, traceLoggingEnabled, partition, state);
                return BoxedUnit.UNIT;
            };
            partitionStates.foreachEntry((arg_0, arg_1) -> Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(forKeyValue$extension_f, arg_0, arg_1));
            Object var15_13 = null;
            if (partitionsMadeFollower.nonEmpty()) {
                this.helper.replicaFetcherManager().removeFetcherForPartitions((Set<TopicPartition>)((Set)partitionsMadeFollower.map((Function1 & Serializable)x$3 -> x$3.topicPartition())));
                this.helper.stateChangeLogger().info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(35).append(topLevelLogPrefix).append(": stopped followers for ").append(partitionsMadeFollower.size()).append(" partitions").toString());
                partitionsMadeFollower.foreach((Function1 & Serializable)partition -> {
                    this.helper.completeDelayedFetchOrProduceRequests(partition.topicPartition());
                    return BoxedUnit.UNIT;
                });
                if (this.helper.isShuttingDown()) {
                    if (traceLoggingEnabled) {
                        partitionsMadeFollower.foreach((Function1 & Serializable)partition -> {
                            RaftReplicaChangeDelegate.$anonfun$makeFollowers$15(this, deferredBatches, metadataOffset, partition);
                            return BoxedUnit.UNIT;
                        });
                    }
                } else {
                    scala.collection.immutable.Map partitionsToMakeFollowerWithLeaderAndOffset = ((IterableOnceOps)partitionsMadeFollower.map((Function1 & Serializable)partition -> {
                        BrokerEndPoint leader = ((MetadataBroker)allBrokersByIdMap.apply(partition.leaderReplicaIdOpt().get())).brokerEndPoint($this.helper.config().interBrokerListenerName());
                        Log log = partition.localLogOrException();
                        long fetchOffset = $this.helper.initialFetchOffset(log);
                        if (deferredBatches) {
                            $this.helper.markOnline((Partition)partition);
                        }
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)partition.topicPartition()), (Object)new InitialFetchState(leader, partition.getLeaderEpoch(), fetchOffset));
                    })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
                    this.helper.replicaFetcherManager().addFetcherForPartitions((Map<TopicPartition, InitialFetchState>)partitionsToMakeFollowerWithLeaderAndOffset);
                }
            }
        }
        catch (Throwable e) {
            this.helper.stateChangeLogger().error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(30).append(topLevelLogPrefix).append(": error while processing batch").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
            throw e;
        }
        if (traceLoggingEnabled) {
            partitionsMadeFollower.foreach((Function1 & Serializable)partition -> {
                RaftReplicaChangeDelegate.$anonfun$makeFollowers$20(this, partitionStates, deferredBatches, metadataOffset, partition);
                return BoxedUnit.UNIT;
            });
        }
        return partitionsMadeFollower;
    }

    public static final /* synthetic */ void $anonfun$makeDeferred$1(RaftReplicaChangeDelegate $this, long metadataOffset$1, Partition partition, boolean isNew) {
        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(78).append("Metadata batch ").append(metadataOffset$1).append(": starting the ").append("become-deferred transition for partition ").append(partition.topicPartition()).append(" isNew=").append(isNew).toString());
    }

    public static final /* synthetic */ void $anonfun$makeDeferred$6(RaftReplicaChangeDelegate $this, long metadataOffset$1, Partition partition) {
        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(58).append("Completed batch ").append(metadataOffset$1).append(" become-deferred ").append("transition for partition ").append(partition.topicPartition()).toString());
    }

    public static final /* synthetic */ void $anonfun$makeLeaders$3(RaftReplicaChangeDelegate $this, boolean deferredBatches$1, Option metadataOffset$2, Set prevPartitionsAlreadyExisting$1, OffsetCheckpoints highWatermarkCheckpoints$1, scala.collection.mutable.Set partitionsMadeLeaders$1, boolean traceLoggingEnabled$1, Partition partition, MetadataPartition state) {
        block4: {
            TopicPartition topicPartition = partition.topicPartition();
            String partitionLogMsgPrefix = deferredBatches$1 ? new StringBuilder(32).append("Apply deferred leader partition ").append(topicPartition).toString() : new StringBuilder(16).append("Metadata batch ").append(metadataOffset$2.get()).append(" ").append(topicPartition).toString();
            try {
                LeaderAndIsrRequestData.LeaderAndIsrPartitionState isrState = state.toLeaderAndIsrPartitionState(!prevPartitionsAlreadyExisting$1.apply((Object)state));
                if (partition.makeLeader(isrState, highWatermarkCheckpoints$1)) {
                    partitionsMadeLeaders$1.$plus$eq((Object)partition);
                    if (traceLoggingEnabled$1) {
                        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(43).append(partitionLogMsgPrefix).append(": completed the become-leader state change.").toString());
                    }
                    break block4;
                }
                $this.helper.stateChangeLogger().info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(72).append(partitionLogMsgPrefix).append(": skipped the ").append("become-leader state change since it is already the leader.").toString());
                return;
            }
            catch (KafkaStorageException e) {
                $this.helper.stateChangeLogger().error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(91).append(partitionLogMsgPrefix).append(": unable to make ").append("leader because the replica for the partition is offline due to disk error ").append((Object)e).toString());
                Option<String> dirOpt = $this.helper.getLogDir(topicPartition);
                $this.helper.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(59).append("Error while making broker the leader for partition ").append(partition).append(" in dir ").append(dirOpt).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
                $this.helper.markOffline(topicPartition);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$makeFollowers$1(RaftReplicaChangeDelegate $this, boolean deferredBatches$2, Option metadataOffset$3, Partition partition, MetadataPartition state) {
        TopicPartition topicPartition = partition.topicPartition();
        String partitionLogMsgPrefix = deferredBatches$2 ? new StringBuilder(34).append("Apply deferred follower partition ").append(topicPartition).toString() : new StringBuilder(16).append("Metadata batch ").append(metadataOffset$3.get()).append(" ").append(topicPartition).toString();
        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(54).append(partitionLogMsgPrefix).append(": starting the ").append("become-follower transition with leader ").append(state.leaderId()).toString());
    }

    public static final /* synthetic */ void $anonfun$makeFollowers$5(RaftReplicaChangeDelegate $this, boolean deferredBatches$2, Option metadataOffset$3, Set prevPartitionsAlreadyExisting$2, scala.collection.immutable.Set acceptableLeaderBrokerIds$1, OffsetCheckpoints highWatermarkCheckpoints$2, scala.collection.mutable.Set partitionsMadeFollower$1, boolean traceLoggingEnabled$2, Partition partition, MetadataPartition state) {
        block5: {
            TopicPartition topicPartition = partition.topicPartition();
            String partitionLogMsgPrefix = deferredBatches$2 ? new StringBuilder(34).append("Apply deferred follower partition ").append(topicPartition).toString() : new StringBuilder(16).append("Metadata batch ").append(metadataOffset$3.get()).append(" ").append(topicPartition).toString();
            try {
                boolean isNew;
                boolean bl = isNew = !prevPartitionsAlreadyExisting$2.apply((Object)state);
                if (!acceptableLeaderBrokerIds$1.contains((Object)BoxesRunTime.boxToInteger((int)state.leaderId()))) {
                    $this.helper.stateChangeLogger().error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(62).append(partitionLogMsgPrefix).append(": cannot become follower ").append("since the new leader ").append(state.leaderId()).append(" is unavailable.").toString());
                    partition.createLogIfNotExists(isNew, false, highWatermarkCheckpoints$2);
                    break block5;
                }
                LeaderAndIsrRequestData.LeaderAndIsrPartitionState isrState = state.toLeaderAndIsrPartitionState(isNew);
                if (partition.makeFollower(isrState, highWatermarkCheckpoints$2)) {
                    partitionsMadeFollower$1.$plus$eq((Object)partition);
                    if (traceLoggingEnabled$2) {
                        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(62).append(partitionLogMsgPrefix).append(": completed the ").append("become-follower state change with new leader ").append(state.leaderId()).append(".").toString());
                    }
                    break block5;
                }
                $this.helper.stateChangeLogger().info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(95).append(partitionLogMsgPrefix).append(": skipped the ").append("become-follower state change since ").append("the new leader ").append(state.leaderId()).append(" is the same as the old leader.").toString());
                return;
            }
            catch (KafkaStorageException e) {
                $this.helper.stateChangeLogger().error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(119).append(partitionLogMsgPrefix).append(": unable to complete the ").append("become-follower state change since the ").append("replica for the partition is offline due to disk error ").append((Object)e).toString());
                Option<String> dirOpt = $this.helper.getLogDir(partition.topicPartition());
                $this.helper.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(59).append("Error while making broker the follower with leader ").append(state.leaderId()).append(" in dir ").append(dirOpt).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
                $this.helper.markOffline(topicPartition);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$makeFollowers$15(RaftReplicaChangeDelegate $this, boolean deferredBatches$2, Option metadataOffset$3, Partition partition) {
        TopicPartition topicPartition = partition.topicPartition();
        String partitionLogMsgPrefix = deferredBatches$2 ? new StringBuilder(34).append("Apply deferred follower partition ").append(topicPartition).toString() : new StringBuilder(16).append("Metadata batch ").append(metadataOffset$3.get()).append(" ").append(topicPartition).toString();
        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(95).append(partitionLogMsgPrefix).append(": skipped the ").append("adding-fetcher step of the become-follower state for ").append(topicPartition).append(" since we are shutting down.").toString());
    }

    public static final /* synthetic */ void $anonfun$makeFollowers$20(RaftReplicaChangeDelegate $this, Map partitionStates$2, boolean deferredBatches$2, Option metadataOffset$3, Partition partition) {
        TopicPartition topicPartition = partition.topicPartition();
        MetadataPartition state = (MetadataPartition)partitionStates$2.apply((Object)partition);
        String partitionLogMsgPrefix = deferredBatches$2 ? new StringBuilder(34).append("Apply deferred follower partition ").append(topicPartition).toString() : new StringBuilder(16).append("Metadata batch ").append(metadataOffset$3.get()).append(" ").append(topicPartition).toString();
        $this.helper.stateChangeLogger().trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(70).append(partitionLogMsgPrefix).append(": completed become-follower ").append("transition for partition ").append(topicPartition).append(" with new leader ").append(state.leaderId()).toString());
    }

    public RaftReplicaChangeDelegate(RaftReplicaChangeDelegateHelper helper) {
        this.helper = helper;
    }
}

