package io.confluent.kafkarest.resources.v3;

import com.google.common.collect.ImmutableList;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.controllers.SimpleConsumeManager;
import io.confluent.kafkarest.controllers.TopicManager;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.Topic;
import io.confluent.kafkarest.entities.v3.SimpleConsumeMultiPartitionRequest;
import io.confluent.kafkarest.entities.v3.SimpleConsumeMultiPartitionResponse;
import io.confluent.kafkarest.entities.v3.SimpleConsumeSinglePartitionResponse;
import io.confluent.kafkarest.extension.ResourceAccesslistFeature;
import io.confluent.kafkarest.resources.AsyncResponses;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.validation.Valid;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.tracing.TraceRecordBuilderImpl;
import org.projectnessie.cel.common.types.Overloads;

@Path("/v3/clusters/{clusterId}/internal/topics/{topicName}")
@ResourceAccesslistFeature.ResourceName("api.v3.simple-consume.*")
/* loaded from: input_file:io/confluent/kafkarest/resources/v3/SimpleConsumeAction.class */
public final class SimpleConsumeAction {
    private final Provider<TopicManager> topicManager;
    private final Provider<SimpleConsumeManager> simpleConsumeManager;

    @Inject
    public SimpleConsumeAction(Provider<TopicManager> provider, Provider<SimpleConsumeManager> provider2) {
        this.topicManager = (Provider) Objects.requireNonNull(provider);
        this.simpleConsumeManager = (Provider) Objects.requireNonNull(provider2);
    }

    @GET
    @Path("/partitions/{partitionId}/records")
    @Consumes({"application/json"})
    @ResourceAccesslistFeature.ResourceName("api.v3.simple-consume.1-partition")
    @Produces({"application/json"})
    @PerformanceMetric("v3.simple-consume.1-partition")
    public void consumeFromPartition(@Suspended AsyncResponse asyncResponse, @PathParam("clusterId") String str, @PathParam("topicName") String str2, @PathParam("partitionId") Integer num, @QueryParam("offset") String str3, @QueryParam("timestamp") String str4, @QueryParam("max_poll_records") Integer num2, @QueryParam("fetch_max_bytes") Integer num3) {
        if ((str3 != null && str4 != null) || (str3 == null && str4 == null)) {
            throw new BadRequestException("Exactly one of 'offset' and 'timestamp' should be provided");
        }
        Long validateLongQueryParameter = validateLongQueryParameter(str3, TraceRecordBuilderImpl.OFFSET);
        Long validateLongQueryParameter2 = validateLongQueryParameter(str4, Overloads.TypeConvertTimestamp);
        SimpleConsumeManager simpleConsumeManager = this.simpleConsumeManager.get();
        AsyncResponses.asyncResume(asyncResponse, checkClusterTopicExist(str, str2).thenCompose(topic -> {
            validatePartitions(Collections.singleton(num), topic.getPartitions());
            TopicPartition topicPartition = new TopicPartition(str2, num.intValue());
            return validateLongQueryParameter != null ? simpleConsumeManager.consumeFromPartitionWithOffset(topicPartition, validateLongQueryParameter, num2, num3) : simpleConsumeManager.consumeFromPartitionWithTimestamp(topicPartition, validateLongQueryParameter2, num2, num3);
        }).thenApply((Function<? super U, ? extends U>) partitionConsumeData -> {
            return SimpleConsumeSinglePartitionResponse.builder().setClusterId(str).setTopicName(str2).setPartitionData(partitionConsumeData).build();
        }));
    }

    @Path("/partitions/-/records:consume")
    @Consumes({"application/json"})
    @ResourceAccesslistFeature.ResourceName("api.v3.simple-consume.n-partitions")
    @POST
    @Produces({"application/json"})
    @PerformanceMetric("v3.simple-consume.n-partitions")
    public void consumeFromPartitions(@Suspended AsyncResponse asyncResponse, @PathParam("clusterId") String str, @PathParam("topicName") String str2, @Valid SimpleConsumeMultiPartitionRequest simpleConsumeMultiPartitionRequest) {
        AsyncResponses.asyncResume(asyncResponse, doConsumeFromPartitions(str, str2, simpleConsumeMultiPartitionRequest, false));
    }

    @Path("/partitions/-/records:consume_guarantee_progress")
    @Consumes({"application/json"})
    @ResourceAccesslistFeature.ResourceName("api.v3.simple-consume.n-partitions-guarantee-progress")
    @POST
    @Produces({"application/json"})
    @PerformanceMetric("v3.simple-consume.n-partitions-guarantee-progress")
    public void consumeFromPartitionsGuaranteeProgress(@Suspended AsyncResponse asyncResponse, @PathParam("clusterId") String str, @PathParam("topicName") String str2, @Valid SimpleConsumeMultiPartitionRequest simpleConsumeMultiPartitionRequest) {
        AsyncResponses.asyncResume(asyncResponse, doConsumeFromPartitions(str, str2, simpleConsumeMultiPartitionRequest, true));
    }

    private CompletableFuture<SimpleConsumeMultiPartitionResponse> doConsumeFromPartitions(String str, String str2, SimpleConsumeMultiPartitionRequest simpleConsumeMultiPartitionRequest, boolean z) {
        if (simpleConsumeMultiPartitionRequest == null) {
            throw Errors.invalidPayloadException("Null input provided. Data is required.");
        }
        int i = 0;
        if (simpleConsumeMultiPartitionRequest.getOffsets() != null) {
            i = 0 + 1;
        }
        if (simpleConsumeMultiPartitionRequest.getFromBeginning() != null) {
            i++;
        }
        if (simpleConsumeMultiPartitionRequest.getTimestamp() != null) {
            i++;
        }
        if (i > 1) {
            throw new BadRequestException("At most one of 'offsets', 'from_beginning' or 'timestamp' should be provided.");
        }
        SimpleConsumeManager simpleConsumeManager = this.simpleConsumeManager.get();
        return checkClusterTopicExist(str, str2).thenCompose(topic -> {
            List<SimpleConsumeMultiPartitionRequest.PartitionOffset> offsets = simpleConsumeMultiPartitionRequest.getOffsets();
            Map<Integer, Long> map = offsets != null ? (Map) offsets.stream().collect(Collectors.toMap((v0) -> {
                return v0.getPartitionId();
            }, (v0) -> {
                return v0.getOffset();
            })) : null;
            ImmutableList<Partition> partitions = topic.getPartitions();
            if (map != null) {
                validatePartitions(map.keySet(), partitions);
            }
            return simpleConsumeManager.consumeFromMultiplePartitions(str2, partitions.size(), map, simpleConsumeMultiPartitionRequest.getFromBeginning(), simpleConsumeMultiPartitionRequest.getTimestamp(), simpleConsumeMultiPartitionRequest.getMaxPollRecords(), simpleConsumeMultiPartitionRequest.getFetchMaxBytes(), z);
        }).thenApply((Function<? super U, ? extends U>) list -> {
            return SimpleConsumeMultiPartitionResponse.builder().setClusterId(str).setTopicName(str2).setPartitionDataList(list).build();
        });
    }

    private CompletableFuture<Topic> checkClusterTopicExist(String str, String str2) {
        return this.topicManager.get().getTopic(str, str2).thenApply(optional -> {
            return (Topic) optional.orElseThrow(Errors::topicNotFoundException);
        });
    }

    private static void validatePartitions(@Nonnull Set<Integer> set, @Nonnull List<Partition> list) {
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getPartitionId();
        }, partition -> {
            return partition;
        }));
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!map.containsKey(Integer.valueOf(intValue))) {
                throw new BadRequestException("Cannot find partition with ID " + intValue);
            }
        }
    }

    private static Long validateLongQueryParameter(String str, String str2) {
        if (str == null) {
            return null;
        }
        try {
            return Long.valueOf(str);
        } catch (NumberFormatException e) {
            throw new BadRequestException("'" + str2 + "' must be numeric");
        }
    }
}
