/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class OffsetFetcher {
    private final Logger log;
    private final ConsumerMetadata metadata;
    private final SubscriptionState subscriptions;
    private final ConsumerNetworkClient client;
    private final Time time;
    private final int requestTimeoutMs;
    private final IsolationLevel isolationLevel;
    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
    private final ApiVersions apiVersions;
    private final OffsetFetcherUtils offsetFetcherUtils;

    public OffsetFetcher(LogContext logContext, ConsumerNetworkClient client, ConsumerMetadata metadata, SubscriptionState subscriptions, Time time, long retryBackoffMs, int requestTimeoutMs, IsolationLevel isolationLevel, ApiVersions apiVersions) {
        this.log = logContext.logger(this.getClass());
        this.time = time;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.requestTimeoutMs = requestTimeoutMs;
        this.isolationLevel = isolationLevel;
        this.apiVersions = apiVersions;
        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptions, time, retryBackoffMs, apiVersions);
    }

    public void resetPositionsIfNeeded() {
        Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap = this.offsetFetcherUtils.getOffsetResetStrategyForPartitions();
        if (partitionAutoOffsetResetStrategyMap.isEmpty()) {
            return;
        }
        this.resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
    }

    public void validatePositionsIfNeeded() {
        Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = this.offsetFetcherUtils.getPartitionsToValidate();
        this.validatePositionsAsync(partitionsToValidate);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Timer timer) {
        this.metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));
        try {
            Map<TopicPartition, OffsetFetcherUtils.ListOffsetData> fetchedOffsets = this.fetchOffsetsByTimes(timestampsToSearch, (Timer)timer, (boolean)true).fetchedOffsets;
            Map<TopicPartition, OffsetAndTimestamp> map = OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, fetchedOffsets);
            return map;
        }
        finally {
            this.metadata.clearTransientTopics();
        }
    }

    private OffsetFetcherUtils.ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, Timer timer, boolean requireTimestamps) {
        final OffsetFetcherUtils.ListOffsetResult result = new OffsetFetcherUtils.ListOffsetResult();
        if (timestampsToSearch.isEmpty()) {
            return result;
        }
        final HashMap<TopicPartition, Long> remainingToSearch = new HashMap<TopicPartition, Long>(timestampsToSearch);
        do {
            final RequestFuture<OffsetFetcherUtils.ListOffsetResult> future = this.sendListOffsetsRequests(remainingToSearch, requireTimestamps);
            future.addListener(new RequestFutureListener<OffsetFetcherUtils.ListOffsetResult>(this){
                final /* synthetic */ OffsetFetcher this$0;
                {
                    this.this$0 = this$0;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(OffsetFetcherUtils.ListOffsetResult value) {
                    RequestFuture requestFuture = future;
                    synchronized (requestFuture) {
                        result.fetchedOffsets.putAll(value.fetchedOffsets);
                        remainingToSearch.keySet().retainAll(value.partitionsToRetry);
                        this.this$0.offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, this.this$0.isolationLevel);
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    if (!(e instanceof RetriableException)) {
                        throw future.exception();
                    }
                }
            });
            if (timer.timeoutMs() == 0L) {
                return result;
            }
            this.client.poll(future, timer);
            if (!future.isDone()) break;
            if (remainingToSearch.isEmpty()) {
                return result;
            }
            this.client.awaitMetadataUpdate(timer);
        } while (timer.notExpired());
        throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
        return this.beginningOrEndOffset(partitions, -2L, timer);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Timer timer) {
        return this.beginningOrEndOffset(partitions, -1L, timer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions, long timestamp, Timer timer) {
        this.metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(partitions));
        try {
            Map<TopicPartition, Long> timestampsToSearch = partitions.stream().distinct().collect(Collectors.toMap(Function.identity(), tp -> timestamp));
            OffsetFetcherUtils.ListOffsetResult result = this.fetchOffsetsByTimes(timestampsToSearch, timer, false);
            Map<TopicPartition, Long> map = result.fetchedOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((OffsetFetcherUtils.ListOffsetData)entry.getValue()).offset));
            return map;
        }
        finally {
            this.metadata.clearTransientTopics();
        }
    }

    private void resetPositionsAsync(final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
        Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((AutoOffsetResetStrategy)e.getValue()).timestamp().get()));
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = this.groupListOffsetRequests(partitionResetTimestamps, new HashSet<TopicPartition>());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
            Node node = entry.getKey();
            final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps = entry.getValue();
            this.subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), this.time.milliseconds() + (long)this.requestTimeoutMs);
            RequestFuture<OffsetFetcherUtils.ListOffsetResult> future = this.sendListOffsetRequest(node, resetTimestamps, false);
            future.addListener(new RequestFutureListener<OffsetFetcherUtils.ListOffsetResult>(this){
                final /* synthetic */ OffsetFetcher this$0;
                {
                    this.this$0 = this$0;
                }

                @Override
                public void onSuccess(OffsetFetcherUtils.ListOffsetResult result) {
                    this.this$0.offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, partitionAutoOffsetResetStrategyMap);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    this.this$0.offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
                }
            });
        }
    }

    private void validatePositionsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
        Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped = OffsetFetcherUtils.regroupFetchPositionsByLeader(partitionsToValidate);
        long nextResetTimeMs = this.time.milliseconds() + (long)this.requestTimeoutMs;
        regrouped.forEach((node, fetchPositions) -> {
            if (node.isEmpty()) {
                this.metadata.requestUpdate(true);
                return;
            }
            NodeApiVersions nodeApiVersions = this.apiVersions.get(node.idString());
            if (nodeApiVersions == null) {
                this.client.tryConnect((Node)node);
                return;
            }
            if (!OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                this.log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not support the required protocol version (introduced in Kafka 2.3)", fetchPositions.keySet());
                for (TopicPartition partition : fetchPositions.keySet()) {
                    this.subscriptions.completeValidation(partition);
                }
                return;
            }
            this.subscriptions.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs);
            RequestFuture future = this.offsetsForLeaderEpochClient.sendAsyncRequest((Node)node, fetchPositions);
            future.addListener(new RequestFutureListener<OffsetsForLeaderEpochUtils.OffsetForEpochResult>(this){
                final /* synthetic */ OffsetFetcher this$0;
                {
                    this.this$0 = this$0;
                }

                @Override
                public void onSuccess(OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetsResult) {
                    this.this$0.offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions, offsetsResult);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    this.this$0.offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
                }
            });
        });
    }

    private RequestFuture<OffsetFetcherUtils.ListOffsetResult> sendListOffsetsRequests(Map<TopicPartition, Long> timestampsToSearch, boolean requireTimestamps) {
        final HashSet<TopicPartition> partitionsToRetry = new HashSet<TopicPartition>();
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = this.groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
        if (timestampsToSearchByNode.isEmpty()) {
            return RequestFuture.failure(new StaleMetadataException());
        }
        final RequestFuture<OffsetFetcherUtils.ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<OffsetFetcherUtils.ListOffsetResult>();
        final HashMap fetchedTimestampOffsets = new HashMap();
        final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
            RequestFuture<OffsetFetcherUtils.ListOffsetResult> future = this.sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
            future.addListener(new RequestFutureListener<OffsetFetcherUtils.ListOffsetResult>(this){
                final /* synthetic */ OffsetFetcher this$0;
                {
                    this.this$0 = this$0;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(OffsetFetcherUtils.ListOffsetResult partialResult) {
                    RequestFuture requestFuture = listOffsetRequestsFuture;
                    synchronized (requestFuture) {
                        fetchedTimestampOffsets.putAll(partialResult.fetchedOffsets);
                        partitionsToRetry.addAll(partialResult.partitionsToRetry);
                        if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) {
                            OffsetFetcherUtils.ListOffsetResult result = new OffsetFetcherUtils.ListOffsetResult(fetchedTimestampOffsets, partitionsToRetry);
                            listOffsetRequestsFuture.complete(result);
                        }
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(RuntimeException e) {
                    RequestFuture requestFuture = listOffsetRequestsFuture;
                    synchronized (requestFuture) {
                        if (!listOffsetRequestsFuture.isDone()) {
                            listOffsetRequestsFuture.raise(e);
                        }
                    }
                }
            });
        }
        return listOffsetRequestsFuture;
    }

    private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests(Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) {
        HashMap<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> partitionDataMap = new HashMap<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>();
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            TopicPartition tp = entry.getKey();
            Long offset = entry.getValue();
            Metadata.LeaderAndEpoch leaderAndEpoch = this.metadata.currentLeader(tp);
            if (leaderAndEpoch.leader.isEmpty()) {
                this.log.debug("Leader for partition {} is unknown for fetching offset {}", (Object)tp, (Object)offset);
                this.metadata.requestUpdate(true);
                partitionsToRetry.add(tp);
                continue;
            }
            Node leader = leaderAndEpoch.leader.get();
            if (this.client.isUnavailable(leader)) {
                this.client.maybeThrowAuthFailure(leader);
                this.log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", (Object)leader, (Object)tp);
                partitionsToRetry.add(tp);
                continue;
            }
            int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(-1);
            partitionDataMap.put(tp, new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offset).setCurrentLeaderEpoch(currentLeaderEpoch));
        }
        return this.offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
    }

    private RequestFuture<OffsetFetcherUtils.ListOffsetResult> sendListOffsetRequest(final Node node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> timestampsToSearch, boolean requireTimestamp) {
        ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder.forConsumer(requireTimestamp, this.isolationLevel).setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch)).setTimeoutMs(this.requestTimeoutMs);
        this.log.debug("Sending ListOffsetRequest {} to broker {}", (Object)builder, (Object)node);
        return this.client.send(node, builder).compose(new RequestFutureAdapter<ClientResponse, OffsetFetcherUtils.ListOffsetResult>(this){
            final /* synthetic */ OffsetFetcher this$0;
            {
                this.this$0 = this$0;
            }

            @Override
            public void onSuccess(ClientResponse response, RequestFuture<OffsetFetcherUtils.ListOffsetResult> future) {
                ListOffsetsResponse lor = (ListOffsetsResponse)response.responseBody();
                this.this$0.log.trace("Received ListOffsetResponse {} from broker {}", (Object)lor, (Object)node);
                this.this$0.handleListOffsetResponse(lor, future);
            }
        });
    }

    private void handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse, RequestFuture<OffsetFetcherUtils.ListOffsetResult> future) {
        try {
            OffsetFetcherUtils.ListOffsetResult result = this.offsetFetcherUtils.handleListOffsetResponse(listOffsetsResponse);
            future.complete(result);
        }
        catch (RuntimeException e) {
            future.raise(e);
        }
    }

    public void validatePositionsOnMetadataChange() {
        this.offsetFetcherUtils.validatePositionsOnMetadataChange();
    }
}

