/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.Assignment;
import org.apache.kafka.server.AssignmentsManagerDeadlineFunction;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.slf4j.Logger;

public final class AssignmentsManager {
    static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(TimeUnit.MILLISECONDS.toNanos(100L), 2, TimeUnit.SECONDS.toNanos(10L), 0.02);
    static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2L);
    static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC = AssignmentsManager.metricName("QueuedReplicaToDirAssignments");
    static final String MAYBE_SEND_ASSIGNMENTS_EVENT = "MaybeSendAssignmentsEvent";
    private final Logger log;
    private final ExponentialBackoff backoff;
    private final Time time;
    private final NodeToControllerChannelManager channelManager;
    private final int nodeId;
    private final Supplier<MetadataImage> metadataImageSupplier;
    private final Function<Uuid, String> directoryIdToDescription;
    private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
    private volatile Map<TopicIdPartition, Assignment> inflight;
    private final MetricsRegistry metricsRegistry;
    private int previousGlobalFailures;
    private final KafkaEventQueue eventQueue;

    static MetricName metricName(String name) {
        return KafkaYammerMetrics.getMetricName((String)"org.apache.kafka.server", (String)"AssignmentsManager", (String)name);
    }

    public AssignmentsManager(Time time, NodeToControllerChannelManager channelManager, int nodeId, Supplier<MetadataImage> metadataImageSupplier, Function<Uuid, String> directoryIdToDescription) {
        this(STANDARD_BACKOFF, time, channelManager, nodeId, metadataImageSupplier, directoryIdToDescription, KafkaYammerMetrics.defaultRegistry());
    }

    AssignmentsManager(ExponentialBackoff backoff, Time time, NodeToControllerChannelManager channelManager, int nodeId, Supplier<MetadataImage> metadataImageSupplier, Function<Uuid, String> directoryIdToDescription, MetricsRegistry metricsRegistry) {
        this.log = new LogContext("[AssignmentsManager id=" + nodeId + "] ").logger(AssignmentsManager.class);
        this.backoff = backoff;
        this.time = time;
        this.channelManager = channelManager;
        this.nodeId = nodeId;
        this.directoryIdToDescription = directoryIdToDescription;
        this.metadataImageSupplier = metadataImageSupplier;
        this.ready = new ConcurrentHashMap();
        this.inflight = Collections.emptyMap();
        this.metricsRegistry = metricsRegistry;
        this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, (Gauge)new Gauge<Integer>(){

            public Integer value() {
                return AssignmentsManager.this.numPending();
            }
        });
        this.previousGlobalFailures = 0;
        this.eventQueue = new KafkaEventQueue(time, new LogContext("[AssignmentsManager id=" + nodeId + "]"), "broker-" + nodeId + "-directory-assignments-manager-", (EventQueue.Event)new ShutdownEvent());
        channelManager.start();
    }

    public int numPending() {
        return this.ready.size() + this.inflight.size();
    }

    public void close() throws InterruptedException {
        this.eventQueue.close();
    }

    public void onAssignment(TopicIdPartition topicIdPartition, Uuid directoryId, String reason, Runnable successCallback) {
        long nowNs = this.time.nanoseconds();
        Assignment assignment = new Assignment(topicIdPartition, directoryId, nowNs, successCallback);
        this.ready.put(topicIdPartition, assignment);
        if (this.log.isTraceEnabled()) {
            String topicDescription = Optional.ofNullable(this.metadataImageSupplier.get().topics().getTopic(assignment.topicIdPartition().topicId())).map(TopicImage::name).orElse(assignment.topicIdPartition().topicId().toString());
            this.log.trace("Registered assignment {}: {}, moving {}-{} into {}", new Object[]{assignment, reason, topicDescription, topicIdPartition.partitionId(), this.directoryIdToDescription.apply(assignment.directoryId())});
        }
        this.rescheduleMaybeSendAssignmentsEvent(nowNs);
    }

    void rescheduleMaybeSendAssignmentsEvent(long nowNs) {
        this.eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT, (Function)new AssignmentsManagerDeadlineFunction(this.backoff, nowNs, this.previousGlobalFailures, !this.inflight.isEmpty(), this.ready.size()), (EventQueue.Event)new MaybeSendAssignmentsEvent());
    }

    void maybeSendAssignments() {
        int inflightSize = this.inflight.size();
        if (this.log.isTraceEnabled()) {
            this.log.trace("maybeSendAssignments: inflightSize = {}.", (Object)inflightSize);
        }
        if (inflightSize > 0) {
            this.log.trace("maybeSendAssignments: cannot send new assignments because there are {} still in flight.", (Object)inflightSize);
            return;
        }
        MetadataImage image = this.metadataImageSupplier.get();
        HashMap<TopicIdPartition, Assignment> newInFlight = new HashMap<TopicIdPartition, Assignment>();
        int numInvalid = 0;
        Iterator<Assignment> iterator = this.ready.values().iterator();
        while (iterator.hasNext() && newInFlight.size() < 2250) {
            Assignment assignment = iterator.next();
            iterator.remove();
            if (assignment.valid(this.nodeId, image)) {
                newInFlight.put(assignment.topicIdPartition(), assignment);
                continue;
            }
            ++numInvalid;
        }
        this.log.info("maybeSendAssignments: sending {} assignments; invalidated {} assignments prior to sending.", (Object)newInFlight.size(), (Object)numInvalid);
        if (!newInFlight.isEmpty()) {
            this.sendAssignments(image.cluster().brokerEpoch(this.nodeId), newInFlight);
        }
    }

    void sendAssignments(long brokerEpoch, Map<TopicIdPartition, Assignment> newInflight) {
        CompletionHandler completionHandler = new CompletionHandler(newInflight);
        this.channelManager.sendRequest((AbstractRequest.Builder)new AssignReplicasToDirsRequest.Builder(AssignmentsManager.buildRequestData(this.nodeId, brokerEpoch, newInflight)), (ControllerRequestCompletionHandler)completionHandler);
        this.inflight = newInflight;
    }

    void handleResponse(Map<TopicIdPartition, Assignment> sent, Optional<ClientResponse> assignmentResponse) {
        this.inflight = Collections.emptyMap();
        Optional<String> globalResponseError = AssignmentsManager.globalResponseError(assignmentResponse);
        if (globalResponseError.isPresent()) {
            ++this.previousGlobalFailures;
            this.log.error("handleResponse: {} assignments failed; global error: {}. Retrying.", (Object)sent.size(), (Object)globalResponseError.get());
            sent.entrySet().forEach(e -> this.ready.putIfAbsent((TopicIdPartition)e.getKey(), (Assignment)e.getValue()));
            return;
        }
        this.previousGlobalFailures = 0;
        AssignReplicasToDirsResponseData responseData = ((AssignReplicasToDirsResponse)assignmentResponse.get().responseBody()).data();
        long nowNs = this.time.nanoseconds();
        for (AssignReplicasToDirsResponseData.DirectoryData directoryData : responseData.directories()) {
            for (AssignReplicasToDirsResponseData.TopicData topicData : directoryData.topics()) {
                for (AssignReplicasToDirsResponseData.PartitionData partitionData : topicData.partitions()) {
                    TopicIdPartition topicIdPartition = new TopicIdPartition(topicData.topicId(), partitionData.partitionIndex());
                    this.handleAssignmentResponse(topicIdPartition, sent, Errors.forCode((short)partitionData.errorCode()), nowNs);
                    sent.remove(topicIdPartition);
                }
            }
        }
        for (Assignment assignment : sent.values()) {
            this.ready.putIfAbsent(assignment.topicIdPartition(), assignment);
            this.log.error("handleResponse: no result in response for partition {}.", (Object)assignment.topicIdPartition());
        }
    }

    void handleAssignmentResponse(TopicIdPartition topicIdPartition, Map<TopicIdPartition, Assignment> sent, Errors error, long nowNs) {
        Assignment assignment = sent.get(topicIdPartition);
        if (assignment == null) {
            this.log.error("handleResponse: response contained topicIdPartition {}, but this was not in the request.", (Object)topicIdPartition);
        } else if (error.equals((Object)Errors.NONE)) {
            try {
                assignment.successCallback().run();
            }
            catch (Exception e) {
                this.log.error("handleResponse: unexpected callback exception", (Throwable)e);
            }
        } else {
            this.ready.putIfAbsent(topicIdPartition, assignment);
            if (this.log.isDebugEnabled() || nowNs > assignment.submissionTimeNs() + MIN_NOISY_FAILURE_INTERVAL_NS) {
                this.log.error("handleResponse: error assigning {}: {}.", (Object)assignment.topicIdPartition(), (Object)error);
            }
        }
    }

    int previousGlobalFailures() throws ExecutionException, InterruptedException {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> future.complete(this.previousGlobalFailures));
        return (Integer)future.get();
    }

    int numInFlight() {
        return this.inflight.size();
    }

    static Optional<String> globalResponseError(Optional<ClientResponse> response) {
        if (response.isEmpty()) {
            return Optional.of("Timeout");
        }
        if (response.get().authenticationException() != null) {
            return Optional.of("AuthenticationException");
        }
        if (response.get().wasTimedOut()) {
            return Optional.of("Disonnected[Timeout]");
        }
        if (response.get().wasDisconnected()) {
            return Optional.of("Disconnected");
        }
        if (response.get().versionMismatch() != null) {
            return Optional.of("UnsupportedVersionException");
        }
        if (response.get().responseBody() == null) {
            return Optional.of("EmptyResponse");
        }
        if (!(response.get().responseBody() instanceof AssignReplicasToDirsResponse)) {
            return Optional.of("ClassCastException");
        }
        AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse)response.get().responseBody()).data();
        Errors error = Errors.forCode((short)data.errorCode());
        if (error != Errors.NONE) {
            return Optional.of("Response-level error: " + error.name());
        }
        return Optional.empty();
    }

    static AssignReplicasToDirsRequestData buildRequestData(int nodeId, long brokerEpoch, Map<TopicIdPartition, Assignment> assignments) {
        HashMap<Uuid, AssignReplicasToDirsRequestData.DirectoryData> directoryMap = new HashMap<Uuid, AssignReplicasToDirsRequestData.DirectoryData>();
        HashMap<Uuid, Map> topicMap = new HashMap<Uuid, Map>();
        for (Map.Entry<TopicIdPartition, Assignment> entry : assignments.entrySet()) {
            TopicIdPartition topicPartition = entry.getKey();
            Uuid directoryId = entry.getValue().directoryId();
            AssignReplicasToDirsRequestData.DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new AssignReplicasToDirsRequestData.DirectoryData().setId(directoryId));
            AssignReplicasToDirsRequestData.TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap()).computeIfAbsent(topicPartition.topicId(), topicId -> {
                AssignReplicasToDirsRequestData.TopicData data = new AssignReplicasToDirsRequestData.TopicData().setTopicId(topicId);
                directory.topics().add(data);
                return data;
            });
            AssignReplicasToDirsRequestData.PartitionData partition = new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(topicPartition.partitionId());
            topic.partitions().add(partition);
        }
        return new AssignReplicasToDirsRequestData().setBrokerId(nodeId).setBrokerEpoch(brokerEpoch).setDirectories(new ArrayList(directoryMap.values()));
    }

    private class ShutdownEvent
    implements EventQueue.Event {
        private ShutdownEvent() {
        }

        public void run() {
            AssignmentsManager.this.log.info("shutting down.");
            try {
                AssignmentsManager.this.channelManager.shutdown();
            }
            catch (Exception e) {
                AssignmentsManager.this.log.error("Unexpected exception shutting down NodeToControllerChannelManager", (Throwable)e);
            }
            try {
                AssignmentsManager.this.metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
            }
            catch (Exception e) {
                AssignmentsManager.this.log.error("Unexpected exception removing metrics.", (Throwable)e);
            }
        }
    }

    private class MaybeSendAssignmentsEvent
    implements EventQueue.Event {
        private MaybeSendAssignmentsEvent() {
        }

        public void run() {
            try {
                AssignmentsManager.this.maybeSendAssignments();
            }
            catch (Exception e) {
                AssignmentsManager.this.log.error("Unexpected exception in MaybeSendAssignmentsEvent", (Throwable)e);
            }
        }
    }

    private class CompletionHandler
    implements ControllerRequestCompletionHandler {
        private final Map<TopicIdPartition, Assignment> sent;

        CompletionHandler(Map<TopicIdPartition, Assignment> sent) {
            this.sent = sent;
        }

        public void onTimeout() {
            AssignmentsManager.this.eventQueue.append((EventQueue.Event)new HandleResponseEvent(this.sent, Optional.empty()));
        }

        public void onComplete(ClientResponse response) {
            AssignmentsManager.this.eventQueue.append((EventQueue.Event)new HandleResponseEvent(this.sent, Optional.of(response)));
        }
    }

    private class HandleResponseEvent
    implements EventQueue.Event {
        private final Map<TopicIdPartition, Assignment> sent;
        private final Optional<ClientResponse> response;

        HandleResponseEvent(Map<TopicIdPartition, Assignment> sent, Optional<ClientResponse> response) {
            this.sent = sent;
            this.response = response;
        }

        public void run() {
            try {
                AssignmentsManager.this.handleResponse(this.sent, this.response);
            }
            catch (Exception e) {
                AssignmentsManager.this.log.error("Unexpected exception in HandleResponseEvent", (Throwable)e);
            }
            finally {
                if (!AssignmentsManager.this.ready.isEmpty()) {
                    AssignmentsManager.this.rescheduleMaybeSendAssignmentsEvent(AssignmentsManager.this.time.nanoseconds());
                }
            }
        }
    }
}

