package io.confluent.security.store.kafka.coordinator;

import io.confluent.security.authorizer.utils.JsonMapper;
import io.confluent.security.store.kafka.coordinator.MetadataServiceAssignment;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:io/confluent/security/store/kafka/coordinator/MetadataServiceCoordinator.class */
public class MetadataServiceCoordinator extends AbstractCoordinator {
    private static final String PROTOCOL_TYPE = "metadata-service";
    public static final String PROTOCOL = "v0";
    private final Logger log;
    private final MetadataServiceRebalanceListener rebalanceListener;
    private final NodeMetadata nodeMetadata;
    private final AtomicBoolean isAlive;
    private MetadataServiceAssignment currentAssignment;

    public MetadataServiceCoordinator(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, NodeMetadata nodeMetadata, ConsumerConfig consumerConfig, Metrics metrics, String str, Time time, MetadataServiceRebalanceListener metadataServiceRebalanceListener) {
        super(new GroupRebalanceConfig(consumerConfig, GroupRebalanceConfig.ProtocolType.CONSUMER), logContext, consumerNetworkClient, metrics, str, time);
        this.log = logContext.logger(MetadataServiceCoordinator.class);
        this.rebalanceListener = (MetadataServiceRebalanceListener) Objects.requireNonNull(metadataServiceRebalanceListener, "rebalanceListener");
        this.nodeMetadata = (NodeMetadata) Objects.requireNonNull(nodeMetadata, "nodeMetadata");
        this.isAlive = new AtomicBoolean(true);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected String protocolType() {
        return PROTOCOL_TYPE;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        JoinGroupRequestData.JoinGroupRequestProtocolCollection joinGroupRequestProtocolCollection = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
        joinGroupRequestProtocolCollection.add((JoinGroupRequestData.JoinGroupRequestProtocolCollection) new JoinGroupRequestData.JoinGroupRequestProtocol().setName("v0").setMetadata(this.nodeMetadata.serialize().array()));
        return joinGroupRequestProtocolCollection;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected boolean onJoinPrepare(Timer timer, int i, String str) {
        this.rebalanceListener.onRevoked(i);
        this.currentAssignment = null;
        return true;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected Map<String, ByteBuffer> onLeaderElected(String str, String str2, List<JoinGroupResponseData.JoinGroupResponseMember> list, boolean z) {
        if (!"v0".equals(str2)) {
            throw new IllegalArgumentException("Invalid protocol received for join complete");
        }
        if (z) {
            throw new IllegalStateException("Can't skip assignment because MetadataServiceCoordinator does not support static membership.");
        }
        Map map = (Map) list.stream().collect(Collectors.toMap(joinGroupResponseMember -> {
            return joinGroupResponseMember.memberId();
        }, joinGroupResponseMember2 -> {
            return NodeMetadata.deserialize(ByteBuffer.wrap(joinGroupResponseMember2.metadata()));
        }));
        this.log.debug("Perform assignment on leader {} members {}", str, list);
        MetadataServiceAssignment.AssignmentError assignmentError = MetadataServiceAssignment.AssignmentError.NONE;
        HashSet hashSet = new HashSet();
        Iterator it = map.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            NodeMetadata nodeMetadata = (NodeMetadata) it.next();
            Stream<URI> stream = nodeMetadata.uris().stream();
            hashSet.getClass();
            if (stream.anyMatch((v1) -> {
                return r1.contains(v1);
            })) {
                assignmentError = MetadataServiceAssignment.AssignmentError.DUPLICATE_URLS;
                this.log.error("Some members are using duplicate URL: {}. Every Metadata Service instance  must be configured with a unique URL.", map);
                break;
            }
            hashSet.addAll(nodeMetadata.uris());
        }
        Map.Entry entry = (Map.Entry) Collections.min(map.entrySet(), Comparator.comparing((v0) -> {
            return v0.getValue();
        }));
        String str3 = (String) entry.getKey();
        NodeMetadata nodeMetadata2 = (NodeMetadata) entry.getValue();
        MetadataServiceAssignment metadataServiceAssignment = new MetadataServiceAssignment(assignmentError.errorCode, map, str3, nodeMetadata2);
        this.log.debug("Node {} with memberId {} elected as writer", nodeMetadata2, str3);
        return (Map) list.stream().collect(Collectors.toMap(joinGroupResponseMember3 -> {
            return joinGroupResponseMember3.memberId();
        }, joinGroupResponseMember4 -> {
            return metadataServiceAssignment.serialize();
        }));
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected void onJoinComplete(int i, String str, String str2, ByteBuffer byteBuffer) {
        if (!"v0".equals(str2)) {
            throw new IllegalArgumentException("Invalid protocol received for join complete");
        }
        this.currentAssignment = (MetadataServiceAssignment) JsonMapper.fromByteBuffer(byteBuffer, MetadataServiceAssignment.class);
        this.rebalanceListener.onAssigned(this.currentAssignment, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void poll(Duration duration) {
        long milliseconds = this.time.milliseconds();
        long millis = milliseconds + duration.toMillis();
        do {
            if (coordinatorUnknown()) {
                ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
                milliseconds = this.time.milliseconds();
            }
            if (rejoinNeededOrPending() && this.isAlive.get()) {
                ensureActiveGroup();
                milliseconds = this.time.milliseconds();
            }
            if (this.isAlive.get()) {
                pollHeartbeat(milliseconds);
                this.client.poll(this.time.timer(Math.min(Math.max(0L, millis - milliseconds), timeToNextHeartbeat(milliseconds))));
            }
            milliseconds = this.time.milliseconds();
            if (milliseconds >= millis) {
                return;
            }
        } while (this.isAlive.get());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public boolean rejoinNeededOrPending() {
        return super.rejoinNeededOrPending() || this.currentAssignment == null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public void close(Timer timer) {
        this.isAlive.set(false);
        super.close(timer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onWriterResigned() {
        this.currentAssignment = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void wakeup() {
        this.client.wakeup();
    }
}
