package org.apache.kafka.server;

import com.yammer.metrics.core.Gauge;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.security.authorizer.AclEntry;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/AssignmentsManager.class */
public class AssignmentsManager {
    private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class);
    private static final long DISPATCH_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(500);
    private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10);
    static final String QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME = "QueuedReplicaToDirAssignments";
    private final Time time;
    private final NodeToControllerChannelManager channelManager;
    private final int brokerId;
    private final Supplier<Long> brokerEpochSupplier;
    private final KafkaEventQueue eventQueue;
    private final Function<Uuid, Optional<String>> dirIdToPath;
    private final Function<Uuid, Optional<String>> topicIdToName;
    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(getClass());
    private volatile Map<TopicIdPartition, AssignmentEvent> inflight = null;
    private volatile Map<TopicIdPartition, AssignmentEvent> pending = new HashMap();
    private final ExponentialBackoff resendExponentialBackoff = new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02d);
    private int failedAttempts = 0;

    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$AssignReplicasToDirsRequestCompletionHandler.class */
    private class AssignReplicasToDirsRequestCompletionHandler implements ControllerRequestCompletionHandler {
        private AssignReplicasToDirsRequestCompletionHandler() {
        }

        @Override // org.apache.kafka.server.ControllerRequestCompletionHandler
        public void onTimeout() {
            AssignmentsManager.log.warn("Request to controller timed out");
            appendResponseEvent(null);
        }

        public void onComplete(ClientResponse clientResponse) {
            AssignmentsManager.log.debug("Received controller response: {}", clientResponse);
            appendResponseEvent(clientResponse);
        }

        void appendResponseEvent(ClientResponse clientResponse) {
            AssignmentsManager.this.eventQueue.prepend(new AssignmentResponseEvent(clientResponse));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$AssignmentEvent.class */
    public class AssignmentEvent extends Event {
        final long timestampNs;
        final TopicIdPartition partition;
        final Uuid dirId;
        final String reason;
        final List<Runnable> completionHandlers;

        AssignmentEvent(long j, TopicIdPartition topicIdPartition, Uuid uuid, String str, Runnable runnable) {
            super();
            this.timestampNs = j;
            this.partition = (TopicIdPartition) Objects.requireNonNull(topicIdPartition);
            this.dirId = (Uuid) Objects.requireNonNull(uuid);
            this.reason = str;
            this.completionHandlers = new ArrayList();
            if (runnable != null) {
                this.completionHandlers.add(runnable);
            }
        }

        void merge(AssignmentEvent assignmentEvent) {
            if (!this.partition.equals(assignmentEvent.partition)) {
                throw new IllegalArgumentException("Cannot merge events for different partitions");
            }
            this.completionHandlers.addAll(assignmentEvent.completionHandlers);
        }

        void onComplete() {
            Iterator<Runnable> it = this.completionHandlers.iterator();
            while (it.hasNext()) {
                it.next().run();
            }
        }

        public void run() {
            AssignmentsManager.log.trace("Received assignment {}", this);
            AssignmentEvent assignmentEvent = (AssignmentEvent) AssignmentsManager.this.pending.getOrDefault(this.partition, null);
            boolean z = false;
            if (assignmentEvent == null && AssignmentsManager.this.inflight != null) {
                assignmentEvent = (AssignmentEvent) AssignmentsManager.this.inflight.getOrDefault(this.partition, null);
                z = true;
            }
            if (assignmentEvent != null) {
                if (assignmentEvent.dirId.equals(this.dirId)) {
                    assignmentEvent.merge(this);
                    AssignmentsManager.log.debug("Ignoring duplicate assignment {}", this);
                    return;
                } else if (assignmentEvent.timestampNs > this.timestampNs) {
                    assignmentEvent.merge(this);
                    AssignmentsManager.log.debug("Dropping assignment {} because it's older than existing {}", this, assignmentEvent);
                    return;
                } else if (!z) {
                    merge(assignmentEvent);
                    AssignmentsManager.log.debug("Dropping existing assignment {} because it's older than {}", assignmentEvent, this);
                }
            }
            AssignmentsManager.log.debug("Queueing new assignment {}", this);
            AssignmentsManager.this.pending.put(this.partition, this);
            if (AssignmentsManager.this.inflight == null || AssignmentsManager.this.inflight.isEmpty()) {
                AssignmentsManager.this.scheduleDispatch();
            }
        }

        public String toString() {
            return "Assignment{timestampNs=" + this.timestampNs + ", partition=" + ((String) ((Optional) AssignmentsManager.this.topicIdToName.apply(this.partition.topicId())).map(str -> {
                return str + AclEntry.RESOURCE_SEPARATOR + this.partition.partitionId();
            }).orElseGet(() -> {
                return "<topic name unknown id: " + this.partition.topicId() + " partition: " + this.partition.partitionId() + ">";
            })) + ", dir=" + ((String) ((Optional) AssignmentsManager.this.dirIdToPath.apply(this.dirId)).orElseGet(() -> {
                return "<dir path unknown id:" + this.dirId + ">";
            })) + ", reason='" + this.reason + "'}";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AssignmentEvent assignmentEvent = (AssignmentEvent) obj;
            return this.timestampNs == assignmentEvent.timestampNs && Objects.equals(this.partition, assignmentEvent.partition) && Objects.equals(this.dirId, assignmentEvent.dirId) && Objects.equals(this.reason, assignmentEvent.reason);
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.timestampNs), this.partition, this.dirId, this.reason);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$AssignmentResponseEvent.class */
    public class AssignmentResponseEvent extends Event {
        private final ClientResponse response;

        public AssignmentResponseEvent(ClientResponse clientResponse) {
            super();
            this.response = clientResponse;
        }

        public void run() {
            if (AssignmentsManager.this.inflight == null) {
                throw new IllegalStateException("Bug. Cannot not be handling a client response if there is are no assignments in flight");
            }
            if (AssignmentsManager.responseIsError(this.response)) {
                AssignmentsManager.this.requeueAllAfterFailure();
                return;
            }
            AssignmentsManager.this.failedAttempts = 0;
            Set<AssignmentEvent> filterFailures = AssignmentsManager.filterFailures(this.response.responseBody().data(), AssignmentsManager.this.inflight);
            for (AssignmentEvent assignmentEvent : Utils.diff(HashSet::new, new HashSet(AssignmentsManager.this.inflight.values()), filterFailures)) {
                if (AssignmentsManager.log.isDebugEnabled()) {
                    AssignmentsManager.log.debug("Successfully propagated assignment {}", assignmentEvent);
                }
                assignmentEvent.onComplete();
            }
            if (!filterFailures.isEmpty()) {
                AssignmentsManager.log.warn("Re-queueing assignments: {}", filterFailures);
                for (AssignmentEvent assignmentEvent2 : filterFailures) {
                    AssignmentsManager.this.pending.put(assignmentEvent2.partition, assignmentEvent2);
                }
            }
            AssignmentsManager.this.inflight = null;
            if (AssignmentsManager.this.pending.isEmpty()) {
                return;
            }
            AssignmentsManager.this.scheduleDispatch();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$DispatchEvent.class */
    public class DispatchEvent extends Event {
        static final String TAG = "dispatch";

        private DispatchEvent() {
            super();
        }

        public void run() {
            if (AssignmentsManager.this.inflight != null) {
                throw new IllegalStateException("Bug. Should not be dispatching while there are assignments in flight");
            }
            if (AssignmentsManager.this.pending.isEmpty()) {
                AssignmentsManager.log.trace("No pending assignments, no-op dispatch");
                return;
            }
            Collection<AssignmentEvent> values = AssignmentsManager.this.pending.values();
            AssignmentsManager.this.pending = new HashMap();
            AssignmentsManager.this.inflight = new HashMap();
            for (AssignmentEvent assignmentEvent : values) {
                if (AssignmentsManager.this.inflight.size() < 2250) {
                    AssignmentsManager.this.inflight.put(assignmentEvent.partition, assignmentEvent);
                } else {
                    AssignmentsManager.this.pending.put(assignmentEvent.partition, assignmentEvent);
                }
            }
            if (!AssignmentsManager.this.pending.isEmpty()) {
                AssignmentsManager.log.warn("Too many assignments ({}) to fit in one call, sending only {} and queueing the rest", Integer.valueOf(2250 + AssignmentsManager.this.pending.size()), 2250);
            }
            Map map = (Map) AssignmentsManager.this.inflight.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return ((AssignmentEvent) entry.getValue()).dirId;
            }));
            AssignmentsManager.log.debug("Dispatching {} assignments:  {}", Integer.valueOf(map.size()), map);
            AssignmentsManager.this.channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(AssignmentsManager.buildRequestData(AssignmentsManager.this.brokerId, ((Long) AssignmentsManager.this.brokerEpochSupplier.get()).longValue(), map)), new AssignReplicasToDirsRequestCompletionHandler());
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$Event.class */
    private static abstract class Event implements EventQueue.Event {
        private Event() {
        }

        public void handleException(Throwable th) {
            AssignmentsManager.log.error("Unexpected error handling {}", this, th);
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$ShutdownEvent.class */
    private class ShutdownEvent extends Event {
        private ShutdownEvent() {
            super();
        }

        public void run() {
            AssignmentsManager.this.channelManager.shutdown();
        }
    }

    public AssignmentsManager(Time time, NodeToControllerChannelManager nodeToControllerChannelManager, int i, Supplier<Long> supplier, Function<Uuid, Optional<String>> function, Function<Uuid, Optional<String>> function2) {
        this.time = time;
        this.channelManager = nodeToControllerChannelManager;
        this.brokerId = i;
        this.brokerEpochSupplier = supplier;
        this.eventQueue = new KafkaEventQueue(time, new LogContext("[AssignmentsManager id=" + i + "]"), "broker-" + i + "-directory-assignments-manager-", new ShutdownEvent());
        nodeToControllerChannelManager.start();
        this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, new Gauge<Integer>() { // from class: org.apache.kafka.server.AssignmentsManager.1
            /* renamed from: value, reason: merged with bridge method [inline-methods] */
            public Integer m4value() {
                return Integer.valueOf(getMapSize(AssignmentsManager.this.inflight) + getMapSize(AssignmentsManager.this.pending));
            }

            private int getMapSize(Map<TopicIdPartition, AssignmentEvent> map) {
                if (map == null) {
                    return 0;
                }
                return map.size();
            }
        });
        this.dirIdToPath = function == null ? uuid -> {
            return Optional.empty();
        } : function;
        this.topicIdToName = function2 == null ? uuid2 -> {
            return Optional.empty();
        } : function2;
    }

    public void close() throws InterruptedException {
        try {
            this.eventQueue.close();
        } finally {
            this.metricsGroup.removeMetric(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
        }
    }

    public void onAssignment(TopicIdPartition topicIdPartition, Uuid uuid, String str) {
        onAssignment(topicIdPartition, uuid, str, null);
    }

    public void onAssignment(TopicIdPartition topicIdPartition, Uuid uuid, String str, Runnable runnable) {
        if (runnable == null) {
            runnable = () -> {
            };
        }
        AssignmentEvent assignmentEvent = new AssignmentEvent(this.time.nanoseconds(), topicIdPartition, uuid, str, runnable);
        if (log.isDebugEnabled()) {
            log.debug("Queued assignment {}", assignmentEvent);
        }
        this.eventQueue.append(assignmentEvent);
    }

    void wakeup() {
        this.eventQueue.wakeup();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleDispatch() {
        if (this.pending.size() < 2250) {
            scheduleDispatch(DISPATCH_INTERVAL_NS);
        } else {
            log.debug("Too many pending assignments, dispatching immediately");
            this.eventQueue.enqueue(EventQueue.EventInsertionType.APPEND, "dispatch-immediate", new EventQueue.NoDeadlineFunction(), new DispatchEvent());
        }
    }

    private void scheduleDispatch(long j) {
        log.debug("Scheduling dispatch in {}ns", Long.valueOf(j));
        this.eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, "dispatch", new EventQueue.LatestDeadlineFunction(this.time.nanoseconds() + j), new DispatchEvent());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requeueAllAfterFailure() {
        if (this.inflight != null) {
            log.debug("Re-queueing all in-flight assignments after failure");
            for (AssignmentEvent assignmentEvent : this.inflight.values()) {
                this.pending.put(assignmentEvent.partition, assignmentEvent);
            }
            this.inflight = null;
            this.failedAttempts++;
            scheduleDispatch(DISPATCH_INTERVAL_NS + TimeUnit.MILLISECONDS.toNanos(this.resendExponentialBackoff.backoff(this.failedAttempts)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean responseIsError(ClientResponse clientResponse) {
        if (clientResponse == null) {
            log.error("Response is null");
            return true;
        }
        if (clientResponse.authenticationException() != null) {
            log.error("Failed to propagate directory assignments because authentication failed", clientResponse.authenticationException());
            return true;
        }
        if (clientResponse.versionMismatch() != null) {
            log.error("Failed to propagate directory assignments because the request version is unsupported", clientResponse.versionMismatch());
            return true;
        }
        if (clientResponse.wasDisconnected()) {
            log.error("Failed to propagate directory assignments because the connection to the controller was disconnected");
            return true;
        }
        if (clientResponse.wasTimedOut()) {
            log.error("Failed to propagate directory assignments because the request timed out");
            return true;
        }
        if (clientResponse.responseBody() == null) {
            log.error("Failed to propagate directory assignments because the Controller returned an empty response");
            return true;
        }
        if (!(clientResponse.responseBody() instanceof AssignReplicasToDirsResponse)) {
            log.error("Failed to propagate directory assignments because the Controller returned an invalid response type");
            return true;
        }
        Errors forCode = Errors.forCode(clientResponse.responseBody().data().errorCode());
        if (forCode == Errors.NONE) {
            return false;
        }
        log.error("Failed to propagate directory assignments because the Controller returned error {}", forCode.name());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Set<AssignmentEvent> filterFailures(AssignReplicasToDirsResponseData assignReplicasToDirsResponseData, Map<TopicIdPartition, AssignmentEvent> map) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (AssignReplicasToDirsResponseData.DirectoryData directoryData : assignReplicasToDirsResponseData.directories()) {
            for (AssignReplicasToDirsResponseData.TopicData topicData : directoryData.topics()) {
                for (AssignReplicasToDirsResponseData.PartitionData partitionData : topicData.partitions()) {
                    TopicIdPartition topicIdPartition = new TopicIdPartition(topicData.topicId(), partitionData.partitionIndex());
                    AssignmentEvent assignmentEvent = map.get(topicIdPartition);
                    if (assignmentEvent == null) {
                        log.error("AssignReplicasToDirsResponse contains unexpected partition {} into directory {}", partitionData, directoryData.id());
                    } else {
                        hashSet2.add(topicIdPartition);
                        Errors forCode = Errors.forCode(partitionData.errorCode());
                        if (forCode == Errors.NOT_LEADER_OR_FOLLOWER) {
                            log.info("Dropping late directory assignment for partition {} into directory {} because this broker is no longer a replica", partitionData, assignmentEvent.dirId);
                        } else if (forCode == Errors.UNKNOWN_TOPIC_ID) {
                            log.info("Dropping late directory assignment for partition {} into directory {} because this topic no longer exists.", partitionData, assignmentEvent.dirId);
                        } else if (forCode != Errors.NONE) {
                            log.error("Controller returned error {} for assignment of partition {} into directory {}", new Object[]{forCode.name(), partitionData, assignmentEvent.dirId});
                            hashSet.add(assignmentEvent);
                        }
                    }
                }
            }
        }
        for (AssignmentEvent assignmentEvent2 : map.values()) {
            if (!hashSet2.contains(assignmentEvent2.partition)) {
                log.error("AssignReplicasToDirsResponse is missing assignment of partition {} into directory {}", assignmentEvent2.partition, assignmentEvent2.dirId);
                hashSet.add(assignmentEvent2);
            }
        }
        return hashSet;
    }

    static AssignReplicasToDirsRequestData buildRequestData(int i, long j, Map<TopicIdPartition, Uuid> map) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<TopicIdPartition, Uuid> entry : map.entrySet()) {
            TopicIdPartition key = entry.getKey();
            Uuid value = entry.getValue();
            AssignReplicasToDirsRequestData.DirectoryData directoryData = (AssignReplicasToDirsRequestData.DirectoryData) hashMap.computeIfAbsent(value, uuid -> {
                return new AssignReplicasToDirsRequestData.DirectoryData().setId(value);
            });
            AssignReplicasToDirsRequestData.TopicData topicData = (AssignReplicasToDirsRequestData.TopicData) ((Map) hashMap2.computeIfAbsent(value, uuid2 -> {
                return new HashMap();
            })).computeIfAbsent(key.topicId(), uuid3 -> {
                AssignReplicasToDirsRequestData.TopicData topicId = new AssignReplicasToDirsRequestData.TopicData().setTopicId(uuid3);
                directoryData.topics().add(topicId);
                return topicId;
            });
            topicData.partitions().add(new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(key.partitionId()));
        }
        return new AssignReplicasToDirsRequestData().setBrokerId(i).setBrokerEpoch(j).setDirectories(new ArrayList(hashMap.values()));
    }
}
