/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.leaderelector.kafka;

import io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryProtocol;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryRebalanceListener;
import io.confluent.kafka.schemaregistry.metrics.SchemaRegistryMetric;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SchemaRegistryCoordinator
extends AbstractCoordinator
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(SchemaRegistryCoordinator.class);
    public static final String SR_SUBPROTOCOL_V0 = "v0";
    private final SchemaRegistryIdentity identity;
    private SchemaRegistryProtocol.Assignment assignmentSnapshot;
    private final SchemaRegistryRebalanceListener listener;
    private final SchemaRegistryMetric nodeCountMetric;

    public SchemaRegistryCoordinator(LogContext logContext, ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, SchemaRegistryIdentity identity, SchemaRegistryRebalanceListener listener, SchemaRegistryMetric nodeCountMetric) {
        super(new GroupRebalanceConfig(sessionTimeoutMs, rebalanceTimeoutMs, heartbeatIntervalMs, groupId, Optional.empty(), retryBackoffMs, true), logContext, client, metrics, metricGrpPrefix, time);
        this.identity = identity;
        this.assignmentSnapshot = null;
        this.listener = listener;
        this.nodeCountMetric = nodeCountMetric;
    }

    public String protocolType() {
        return "sr";
    }

    public void poll(long timeout) {
        long elapsed;
        long remaining;
        long start;
        long now = start = this.time.milliseconds();
        do {
            if (this.coordinatorUnknown()) {
                this.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
                now = this.time.milliseconds();
            }
            if (this.rejoinNeededOrPending()) {
                this.ensureActiveGroup();
                now = this.time.milliseconds();
            }
            this.pollHeartbeat(now);
            elapsed = now - start;
            remaining = timeout - elapsed;
            this.client.poll(this.time.timer(Math.min(Math.max(0L, remaining), this.timeToNextHeartbeat(now))));
        } while ((remaining = timeout - (elapsed = (now = this.time.milliseconds()) - start)) > 0L);
    }

    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        ByteBuffer metadata = SchemaRegistryProtocol.serializeMetadata(this.identity);
        return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol().setName(SR_SUBPROTOCOL_V0).setMetadata(metadata.array())).iterator());
    }

    protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
        this.assignmentSnapshot = SchemaRegistryProtocol.deserializeAssignment(memberAssignment);
        this.listener.onAssigned(this.assignmentSnapshot, generation);
    }

    protected Map<String, ByteBuffer> performAssignment(String kafkaLeaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
        log.debug("Performing assignment");
        HashMap<String, SchemaRegistryIdentity> memberConfigs = new HashMap<String, SchemaRegistryIdentity>();
        for (JoinGroupResponseData.JoinGroupResponseMember entry : allMemberMetadata) {
            SchemaRegistryIdentity identity = SchemaRegistryProtocol.deserializeMetadata(ByteBuffer.wrap(entry.metadata()));
            memberConfigs.put(entry.memberId(), identity);
        }
        log.debug("Member information: {}", memberConfigs);
        if (this.nodeCountMetric != null) {
            this.nodeCountMetric.set(memberConfigs.size());
        }
        SchemaRegistryIdentity leaderIdentity = null;
        String leaderKafkaId = null;
        HashSet<String> urls = new HashSet<String>();
        for (Map.Entry entry : memberConfigs.entrySet()) {
            boolean smallerIdentity;
            String kafkaMemberId = (String)entry.getKey();
            SchemaRegistryIdentity memberIdentity = (SchemaRegistryIdentity)entry.getValue();
            urls.add(memberIdentity.getUrl());
            boolean eligible = memberIdentity.getLeaderEligibility();
            boolean bl = smallerIdentity = leaderIdentity == null || memberIdentity.getUrl().compareTo(leaderIdentity.getUrl()) < 0;
            if (!eligible || !smallerIdentity) continue;
            leaderKafkaId = kafkaMemberId;
            leaderIdentity = memberIdentity;
        }
        short error = 0;
        if (urls.size() != memberConfigs.size()) {
            log.error("Found duplicate URLs for schema registry group members. This indicates a misconfiguration and is common when executing in containers. Use the host.name configuration to set each instance's advertised host name to a value that is routable from all other schema registry instances.");
            error = 1;
        }
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        SchemaRegistryProtocol.Assignment assignment = new SchemaRegistryProtocol.Assignment(error, leaderKafkaId, leaderIdentity);
        log.debug("Assignment: {}", (Object)assignment);
        for (String member : memberConfigs.keySet()) {
            groupAssignment.put(member, SchemaRegistryProtocol.serializeAssignment(assignment));
        }
        return groupAssignment;
    }

    protected void onJoinPrepare(int generation, String memberId) {
        log.debug("Revoking previous assignment {}", (Object)this.assignmentSnapshot);
        if (this.assignmentSnapshot != null) {
            this.listener.onRevoked();
        }
    }

    protected synchronized boolean ensureCoordinatorReady(Timer timer) {
        return super.ensureCoordinatorReady(timer);
    }

    protected boolean rejoinNeededOrPending() {
        return super.rejoinNeededOrPending() || this.assignmentSnapshot == null;
    }
}

