/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse>
implements RequestManager {
    protected final Logger logger;
    protected final int maxPollIntervalMs;
    protected final CoordinatorRequestManager coordinatorRequestManager;
    private final HeartbeatRequestState heartbeatRequestState;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Timer pollTimer;
    private final HeartbeatMetricsManager metricsManager;
    public static final String CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the new CONSUMER group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol until the cluster is upgraded.";

    AbstractHeartbeatRequestManager(LogContext logContext, Time time, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, BackgroundEventHandler backgroundEventHandler, HeartbeatMetricsManager metricsManager) {
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.logger = logContext.logger(this.getClass());
        this.backgroundEventHandler = backgroundEventHandler;
        this.maxPollIntervalMs = config.getInt("max.poll.interval.ms");
        long retryBackoffMs = config.getLong("retry.backoff.ms");
        long retryBackoffMaxMs = config.getLong("retry.backoff.max.ms");
        this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0L, retryBackoffMs, retryBackoffMaxMs, (double)this.maxPollIntervalMs);
        this.pollTimer = time.timer(this.maxPollIntervalMs);
        this.metricsManager = metricsManager;
    }

    AbstractHeartbeatRequestManager(LogContext logContext, Timer timer, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler, HeartbeatMetricsManager metricsManager) {
        this.logger = logContext.logger(this.getClass());
        this.maxPollIntervalMs = config.getInt("max.poll.interval.ms");
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.heartbeatRequestState = heartbeatRequestState;
        this.backgroundEventHandler = backgroundEventHandler;
        this.pollTimer = timer;
        this.metricsManager = metricsManager;
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        boolean heartbeatNow;
        if (this.coordinatorRequestManager.coordinator().isEmpty() || this.membershipManager().shouldSkipHeartbeat()) {
            this.membershipManager().onHeartbeatRequestSkipped();
            this.maybePropagateCoordinatorFatalErrorEvent();
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        this.pollTimer.update(currentTimeMs);
        if (this.pollTimer.isExpired() && !this.membershipManager().isLeavingGroup()) {
            this.logger.warn("Consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.");
            this.membershipManager().transitionToSendingLeaveGroup(true);
            NetworkClientDelegate.UnsentRequest leaveHeartbeat = this.makeHeartbeatRequest(currentTimeMs, true);
            this.heartbeatRequestState.reset();
            this.resetHeartbeatState();
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(leaveHeartbeat));
        }
        boolean bl = heartbeatNow = this.membershipManager().state() == MemberState.LEAVING || this.membershipManager().shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight();
        if (!this.heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
        }
        NetworkClientDelegate.UnsentRequest request = this.makeHeartbeatRequest(currentTimeMs, false);
        return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
    }

    public abstract AbstractMembershipManager<R> membershipManager();

    @Override
    public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
        if (this.membershipManager().isLeavingGroup()) {
            NetworkClientDelegate.UnsentRequest request = this.makeHeartbeatRequest(currentTimeMs, true);
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
        }
        return NetworkClientDelegate.PollResult.EMPTY;
    }

    @Override
    public long maximumTimeToWait(long currentTimeMs) {
        this.pollTimer.update(currentTimeMs);
        if (this.pollTimer.isExpired() || this.membershipManager().shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight()) {
            return 0L;
        }
        return Math.min(this.pollTimer.remainingMs() / 2L, this.heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
    }

    public void resetPollTimer(long pollMs) {
        this.pollTimer.update(pollMs);
        if (this.pollTimer.isExpired()) {
            this.logger.warn("Time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, exceeded approximately by {} ms. Member {} will rejoin the group now.", (Object)this.pollTimer.isExpiredBy(), (Object)this.membershipManager().memberId());
            this.membershipManager().maybeRejoinStaleMember();
        }
        this.pollTimer.reset(this.maxPollIntervalMs);
    }

    private void maybePropagateCoordinatorFatalErrorEvent() {
        this.coordinatorRequestManager.getAndClearFatalError().ifPresent(fatalError -> this.backgroundEventHandler.add(new ErrorEvent((Throwable)fatalError)));
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(long currentTimeMs, boolean ignoreResponse) {
        NetworkClientDelegate.UnsentRequest request = this.makeHeartbeatRequest(ignoreResponse);
        this.heartbeatRequestState.onSendAttempt(currentTimeMs);
        this.membershipManager().onHeartbeatRequestGenerated();
        this.metricsManager.recordHeartbeatSentMs(currentTimeMs);
        this.heartbeatRequestState.resetTimer();
        return request;
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(boolean ignoreResponse) {
        NetworkClientDelegate.UnsentRequest request = this.buildHeartbeatRequest();
        if (ignoreResponse) {
            return this.logResponse(request);
        }
        return request.whenComplete((response, exception) -> {
            long completionTimeMs = request.handler().completionTimeMs();
            if (response != null) {
                this.metricsManager.recordRequestLatency(response.requestLatencyMs());
                this.onResponse(response.responseBody(), completionTimeMs);
            } else {
                this.onFailure((Throwable)exception, completionTimeMs);
            }
        });
    }

    private NetworkClientDelegate.UnsentRequest logResponse(NetworkClientDelegate.UnsentRequest request) {
        return request.whenComplete((response, exception) -> {
            if (response != null) {
                this.metricsManager.recordRequestLatency(response.requestLatencyMs());
                Errors error = this.errorForResponse(response.responseBody());
                if (error == Errors.NONE) {
                    this.logger.debug("{} responded successfully: {}", (Object)this.heartbeatRequestName(), response);
                } else {
                    this.logger.error("{} failed because of {}: {}", new Object[]{this.heartbeatRequestName(), error, response});
                }
            } else {
                this.logger.error("{} failed because of unexpected exception.", (Object)this.heartbeatRequestName(), exception);
            }
        });
    }

    private void onFailure(Throwable exception, long responseTimeMs) {
        this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
        this.resetHeartbeatState();
        if (exception instanceof RetriableException) {
            this.coordinatorRequestManager.handleCoordinatorDisconnect(exception, responseTimeMs);
            String message = String.format("%s failed because of the retriable exception. Will retry in %s ms: %s", this.heartbeatRequestName(), this.heartbeatRequestState.remainingBackoffMs(responseTimeMs), exception.getMessage());
            this.logger.debug(message);
        } else if (!this.handleSpecificFailure(exception)) {
            this.logger.error("{} failed due to fatal error: {}", (Object)this.heartbeatRequestName(), (Object)exception.getMessage());
            this.handleFatalFailure(exception);
        }
        this.membershipManager().onHeartbeatFailure(exception instanceof RetriableException);
    }

    private void onResponse(R response, long currentTimeMs) {
        if (this.errorForResponse(response) == Errors.NONE) {
            this.heartbeatRequestState.updateHeartbeatIntervalMs(this.heartbeatIntervalForResponse(response));
            this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
            this.membershipManager().onHeartbeatSuccess(response);
            return;
        }
        this.onErrorResponse(response, currentTimeMs);
    }

    private void onErrorResponse(R response, long currentTimeMs) {
        Errors error = this.errorForResponse(response);
        String errorMessage = this.errorMessageForResponse(response);
        this.resetHeartbeatState();
        this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
        switch (error) {
            case NOT_COORDINATOR: {
                String message = String.format("%s failed because the group coordinator %s is incorrect. Will attempt to find the coordinator again and retry", this.heartbeatRequestName(), this.coordinatorRequestManager.coordinator());
                this.logInfo(message, response, currentTimeMs);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs);
                this.heartbeatRequestState.reset();
                break;
            }
            case COORDINATOR_NOT_AVAILABLE: {
                String message = String.format("%s failed because the group coordinator %s is not available. Will attempt to find the coordinator again and retry", this.heartbeatRequestName(), this.coordinatorRequestManager.coordinator());
                this.logInfo(message, response, currentTimeMs);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs);
                this.heartbeatRequestState.reset();
                break;
            }
            case COORDINATOR_LOAD_IN_PROGRESS: {
                String message = String.format("%s failed because the group coordinator %s is still loading. Will retry", this.heartbeatRequestName(), this.coordinatorRequestManager.coordinator());
                this.logInfo(message, response, currentTimeMs);
                break;
            }
            case GROUP_AUTHORIZATION_FAILED: {
                GroupAuthorizationException exception = GroupAuthorizationException.forGroupId(this.membershipManager().groupId());
                this.logger.error("{} failed due to group authorization failure: {}", (Object)this.heartbeatRequestName(), (Object)exception.getMessage());
                this.handleFatalFailure(error.exception(exception.getMessage()));
                break;
            }
            case TOPIC_AUTHORIZATION_FAILED: {
                this.logger.error("{} failed for member {} with state {} due to {}: {}", new Object[]{this.heartbeatRequestName(), this.membershipManager().memberId, this.membershipManager().state, error, errorMessage});
                this.backgroundEventHandler.add(new ErrorEvent(error.exception()));
                break;
            }
            case INVALID_REQUEST: 
            case GROUP_MAX_SIZE_REACHED: 
            case UNSUPPORTED_ASSIGNOR: {
                this.logger.error("{} failed due to {}: {}", new Object[]{this.heartbeatRequestName(), error, errorMessage});
                this.handleFatalFailure(error.exception(errorMessage));
                break;
            }
            case FENCED_MEMBER_EPOCH: {
                String message = String.format("%s failed for member %s because epoch %s is fenced.", this.heartbeatRequestName(), this.membershipManager().memberId(), this.membershipManager().memberEpoch());
                this.logInfo(message, response, currentTimeMs);
                this.membershipManager().transitionToFenced();
                this.heartbeatRequestState.reset();
                break;
            }
            case UNKNOWN_MEMBER_ID: {
                String message = String.format("%s failed because member %s is unknown.", this.heartbeatRequestName(), this.membershipManager().memberId());
                this.logInfo(message, response, currentTimeMs);
                this.membershipManager().transitionToFenced();
                this.heartbeatRequestState.reset();
                break;
            }
            case INVALID_REGULAR_EXPRESSION: {
                this.logger.error("{} failed due to {}: {}", new Object[]{this.heartbeatRequestName(), error, errorMessage});
                this.handleFatalFailure(error.exception("Invalid RE2J SubscriptionPattern provided in the call to subscribe. " + errorMessage));
                break;
            }
            default: {
                if (this.handleSpecificExceptionInResponse(response, currentTimeMs)) break;
                this.logger.error("{} failed due to unexpected error {}: {}", new Object[]{this.heartbeatRequestName(), error, errorMessage});
                this.handleFatalFailure(error.exception(errorMessage));
            }
        }
        this.membershipManager().onHeartbeatFailure(false);
    }

    protected void logInfo(String message, R response, long currentTimeMs) {
        this.logger.info("{} in {}ms: {}", new Object[]{message, this.heartbeatRequestState.remainingBackoffMs(currentTimeMs), this.errorMessageForResponse(response)});
    }

    protected void handleFatalFailure(Throwable error) {
        this.backgroundEventHandler.add(new ErrorEvent(error));
        this.membershipManager().transitionToFatal();
    }

    public boolean handleSpecificFailure(Throwable exception) {
        return false;
    }

    public boolean handleSpecificExceptionInResponse(R response, long currentTimeMs) {
        return false;
    }

    public abstract void resetHeartbeatState();

    public abstract NetworkClientDelegate.UnsentRequest buildHeartbeatRequest();

    public abstract String heartbeatRequestName();

    public abstract Errors errorForResponse(R var1);

    public abstract String errorMessageForResponse(R var1);

    public abstract long heartbeatIntervalForResponse(R var1);

    static class HeartbeatRequestState
    extends RequestState {
        private final Timer heartbeatTimer;
        private long heartbeatIntervalMs;

        HeartbeatRequestState(LogContext logContext, Time time, long heartbeatIntervalMs, long retryBackoffMs, long retryBackoffMaxMs, double jitter) {
            super(logContext, HeartbeatRequestState.class.getName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter);
            this.heartbeatIntervalMs = heartbeatIntervalMs;
            this.heartbeatTimer = time.timer(heartbeatIntervalMs);
        }

        private void update(long currentTimeMs) {
            this.heartbeatTimer.update(currentTimeMs);
        }

        void resetTimer() {
            this.heartbeatTimer.reset(this.heartbeatIntervalMs);
        }

        @Override
        public String toStringBase() {
            return super.toStringBase() + ", remainingMs=" + this.heartbeatTimer.remainingMs() + ", heartbeatIntervalMs=" + this.heartbeatIntervalMs;
        }

        @Override
        public boolean canSendRequest(long currentTimeMs) {
            this.update(currentTimeMs);
            return this.heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
        }

        long timeToNextHeartbeatMs(long currentTimeMs) {
            if (this.heartbeatTimer.isExpired()) {
                return this.remainingBackoffMs(currentTimeMs);
            }
            return this.heartbeatTimer.remainingMs();
        }

        @Override
        public void onFailedAttempt(long currentTimeMs) {
            this.heartbeatTimer.reset(0L);
            super.onFailedAttempt(currentTimeMs);
        }

        private void updateHeartbeatIntervalMs(long heartbeatIntervalMs) {
            if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
                return;
            }
            this.heartbeatIntervalMs = heartbeatIntervalMs;
            this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
        }
    }
}

