package org.apache.kafka.common.requests;

import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SampledLogAction;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/common/requests/SamplingRequestLogFilterTest.class */
public class SamplingRequestLogFilterTest {
    private final MockTime time = new MockTime();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/requests/SamplingRequestLogFilterTest$Request.class */
    public static class Request {
        private final RequestContext context;
        private final long time;

        private Request(RequestContext requestContext, long j) {
            this.context = requestContext;
            this.time = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/requests/SamplingRequestLogFilterTest$Sampling.class */
    public static class Sampling {
        private final Request sample;
        private final SampledLogAction action;
        private final List<Request> skipped;

        private Sampling(Request request, SampledLogAction sampledLogAction, List<Request> list) {
            this.sample = request;
            this.action = sampledLogAction;
            this.skipped = list;
        }
    }

    @Test
    public void testAdminRequestSampling() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        Iterator<Request> it = fixedIntervalRequestGenerator(ApiKeys.CREATE_TOPICS, Duration.ofSeconds(1L)).iterator();
        samplingRequestLogFilter.configure(Collections.singletonMap("confluent.request.log.samples.per.min", 10));
        Assertions.assertEquals(Collections.emptyList(), (List) filterForDuration(it, Duration.ofMinutes(1L), request -> {
            return !samplingRequestLogFilter.shouldLogRequest(request.context, request.time);
        }).collect(Collectors.toList()));
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.request.log.samples.per.min", 0);
        hashMap.put("confluent.request.log.enable.admin.apis", false);
        samplingRequestLogFilter.configure(hashMap);
        Assertions.assertEquals(Collections.emptyList(), (List) sampleForDuration(samplingRequestLogFilter, it, Duration.ofMinutes(1L)).collect(Collectors.toList()));
    }

    @Test
    public void testDefaultSampledRequestsAugmentRequestLog() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        samplingRequestLogFilter.configure(Collections.singletonMap("confluent.request.log.samples.per.min", 10));
        testSampledRequestsAugmentRequestLog(samplingRequestLogFilter, true);
    }

    @Test
    public void testApiSampledRequestsAugmentRequestLog() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        samplingRequestLogFilter.configure(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=10"));
        testSampledRequestsAugmentRequestLog(samplingRequestLogFilter, false);
    }

    private void testSampledRequestsAugmentRequestLog(SamplingRequestLogFilter samplingRequestLogFilter, boolean z) {
        Iterator<Request> it = fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofMillis(100L)).iterator();
        Sampling runUntilFirstSample = runUntilFirstSample(it, Duration.ofMinutes(1L), samplingRequestLogFilter);
        Assertions.assertNotNull(runUntilFirstSample);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(runUntilFirstSample.action.isDefaultSampled));
        Assertions.assertEquals(0L, runUntilFirstSample.action.nanosSinceLastSample);
        Assertions.assertEquals(0L, runUntilFirstSample.action.requestsSinceLastSample);
        filterForDuration(it, Duration.ofMinutes(1L), request -> {
            return samplingRequestLogFilter.shouldLogRequest(request.context, request.time);
        });
        Sampling runUntilFirstSample2 = runUntilFirstSample(it, Duration.ofMinutes(1L), samplingRequestLogFilter);
        Assertions.assertNotNull(runUntilFirstSample2);
        Sampling runUntilFirstSample3 = runUntilFirstSample(it, Duration.ofMinutes(1L), samplingRequestLogFilter);
        Assertions.assertNotNull(runUntilFirstSample3);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(runUntilFirstSample3.action.isDefaultSampled));
        Assertions.assertEquals(runUntilFirstSample3.skipped.size(), runUntilFirstSample3.action.requestsSinceLastSample);
        Assertions.assertEquals(runUntilFirstSample3.sample.time - runUntilFirstSample2.sample.time, runUntilFirstSample3.action.nanosSinceLastSample);
    }

    private Sampling runUntilFirstSample(Iterator<Request> it, Duration duration, RequestLogFilter requestLogFilter) {
        ArrayList arrayList = new ArrayList();
        long milliseconds = this.time.milliseconds() + duration.toMillis();
        while (this.time.milliseconds() <= milliseconds) {
            Request next = it.next();
            this.time.sleep(Duration.ofNanos(Math.max(0L, next.time - this.time.nanoseconds())).toMillis());
            SampledLogAction processRequest = requestLogFilter.processRequest(next.context, next.time);
            if (processRequest.shouldLog()) {
                Assertions.assertTrue(processRequest instanceof SampledLogAction);
                return new Sampling(next, processRequest, arrayList);
            }
            arrayList.add(next);
        }
        return null;
    }

    @Test
    public void testDefaultSampling() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        samplingRequestLogFilter.configure(Collections.singletonMap("confluent.request.log.samples.per.min", 10));
        Iterator<Request> it = fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(1L)).iterator();
        List<Request> list = (List) sampleForDuration(samplingRequestLogFilter, it, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(10, list.size());
        assertSampleInterval(list, Duration.ofSeconds(1L));
        List<Request> list2 = (List) sampleForDuration(samplingRequestLogFilter, it, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(10, list2.size());
        assertSampleInterval(list2, Duration.ofSeconds(6L));
    }

    @Test
    public void testOverrideSamplingNoDefault() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        samplingRequestLogFilter.configure(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=10,Produce=6"));
        Iterator<Request> it = joinAll(Arrays.asList(fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(1L)), fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(5L)), fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        List list = (List) sampleForDuration(samplingRequestLogFilter, it, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(16, list.size());
        List<Request> list2 = (List) list.stream().filter(request -> {
            return request.context.header.apiKey() == ApiKeys.PRODUCE;
        }).collect(Collectors.toList());
        List<Request> list3 = (List) list.stream().filter(request2 -> {
            return request2.context.header.apiKey() == ApiKeys.METADATA;
        }).collect(Collectors.toList());
        Assertions.assertEquals(6, list2.size());
        Assertions.assertEquals(10, list3.size());
        assertSampleInterval(list2, Duration.ofSeconds(5L));
        assertSampleInterval(list3, Duration.ofSeconds(1L));
        List list4 = (List) sampleForDuration(samplingRequestLogFilter, it, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(16, list4.size());
        List<Request> list5 = (List) list4.stream().filter(request3 -> {
            return request3.context.header.apiKey() == ApiKeys.PRODUCE;
        }).collect(Collectors.toList());
        List<Request> list6 = (List) list4.stream().filter(request4 -> {
            return request4.context.header.apiKey() == ApiKeys.METADATA;
        }).collect(Collectors.toList());
        Assertions.assertEquals(6, list5.size());
        Assertions.assertEquals(10, list6.size());
        assertSampleInterval(list5, Duration.ofSeconds(10L));
        assertSampleInterval(list6, Duration.ofSeconds(6L));
    }

    @Test
    public void testDisabledApi() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.request.log.api.samples.per.min", "Metadata=0");
        hashMap.put("confluent.request.log.samples.per.min", "10");
        samplingRequestLogFilter.configure(hashMap);
        List list = (List) sampleForDuration(samplingRequestLogFilter, joinAll(Arrays.asList(fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(1L)), fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(5L)), fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator(), Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(10, list.size());
        Assertions.assertTrue(list.stream().allMatch(request -> {
            return request.context.header.apiKey() != ApiKeys.METADATA;
        }));
    }

    @Test
    public void testDisabledAdminApi() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.request.log.api.samples.per.min", "CreateTopics=0");
        hashMap.put("confluent.request.log.enable.admin.apis", "true");
        samplingRequestLogFilter.configure(hashMap);
        List list = (List) sampleForDuration(samplingRequestLogFilter, joinAll(Arrays.asList(fixedIntervalRequestGenerator(ApiKeys.CREATE_TOPICS, Duration.ofSeconds(1L)), fixedIntervalRequestGenerator(ApiKeys.DELETE_TOPICS, Duration.ofSeconds(5L)))).iterator(), Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertTrue(list.size() > 0);
        Assertions.assertTrue(list.stream().allMatch(request -> {
            return request.context.header.apiKey() == ApiKeys.DELETE_TOPICS;
        }));
    }

    @Test
    public void testOverrideSamplingWithDefault() {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.request.log.api.samples.per.min", "Produce=6");
        hashMap.put("confluent.request.log.samples.per.min", "10");
        samplingRequestLogFilter.configure(hashMap);
        List list = (List) sampleForDuration(samplingRequestLogFilter, joinAll(Arrays.asList(fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(2L)), fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(5L)), fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator(), Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(16, list.size());
        List list2 = (List) list.stream().filter(request -> {
            return request.context.header.apiKey() == ApiKeys.PRODUCE;
        }).collect(Collectors.toList());
        List list3 = (List) list.stream().filter(request2 -> {
            return request2.context.header.apiKey() != ApiKeys.PRODUCE;
        }).collect(Collectors.toList());
        Assertions.assertEquals(6, list2.size());
        Assertions.assertEquals(10, list3.size());
    }

    @Test
    public void testInvalidDefaultConfigs() {
        assertThrowsConfigException(Collections.singletonMap("confluent.request.log.samples.per.min", "blah"));
        assertThrowsConfigException(Collections.singletonMap("confluent.request.log.samples.per.min", "null"));
        assertThrowsConfigException(Collections.singletonMap("confluent.request.log.samples.per.min", "1.3"));
        assertThrowsConfigException(Collections.singletonMap("confluent.request.log.api.samples.per.min", "MD=5"));
        assertThrowsConfigException(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=5,Fetch=dd"));
        assertThrowsConfigException(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=5,blah,"));
    }

    private void assertThrowsConfigException(Map<String, ?> map) {
        SamplingRequestLogFilter samplingRequestLogFilter = new SamplingRequestLogFilter();
        Assertions.assertThrows(ConfigException.class, () -> {
            samplingRequestLogFilter.validateReconfiguration(map);
        });
        Assertions.assertThrows(ConfigException.class, () -> {
            samplingRequestLogFilter.configure(map);
        });
        Assertions.assertThrows(ConfigException.class, () -> {
            samplingRequestLogFilter.reconfigure(map);
        });
    }

    private RequestContext newRequestContext(ApiKeys apiKeys) {
        return new RequestContext(new RequestHeader(apiKeys, apiKeys.latestVersion(), "clientId", 1), "cxnId", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, (PathAwareSniHostName) null, false, Optional.empty());
    }

    private Stream<Request> fixedIntervalRequestGenerator(ApiKeys apiKeys, Duration duration) {
        AtomicLong atomicLong = new AtomicLong(this.time.nanoseconds());
        return Stream.generate(() -> {
            Request request = new Request(newRequestContext(apiKeys), atomicLong.get());
            atomicLong.set(request.time + duration.toNanos());
            return request;
        });
    }

    private Stream<Request> sampleForDuration(SamplingRequestLogFilter samplingRequestLogFilter, Iterator<Request> it, Duration duration) {
        return filterForDuration(it, duration, request -> {
            return samplingRequestLogFilter.shouldLogRequest(request.context, request.time);
        });
    }

    private Stream<Request> filterForDuration(Iterator<Request> it, Duration duration, Predicate<Request> predicate) {
        long milliseconds = this.time.milliseconds() + duration.toMillis();
        Stream.Builder builder = Stream.builder();
        while (this.time.milliseconds() <= milliseconds) {
            Request next = it.next();
            this.time.sleep(Duration.ofNanos(Math.max(0L, next.time - this.time.nanoseconds())).toMillis());
            if (predicate.test(next)) {
                builder.accept(next);
            }
        }
        return builder.build();
    }

    private Stream<Long> sampleIntervals(List<Request> list) {
        Stream.Builder builder = Stream.builder();
        Long l = null;
        for (Request request : list) {
            if (l != null) {
                builder.accept(Long.valueOf(request.time - l.longValue()));
            }
            l = Long.valueOf(request.time);
        }
        return builder.build();
    }

    private void assertSampleInterval(List<Request> list, Duration duration) {
        Assertions.assertTrue(sampleIntervals(list).allMatch(l -> {
            return Duration.ofNanos(l.longValue()).equals(duration);
        }));
    }

    private Stream<Request> join(Stream<Request> stream, Stream<Request> stream2) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Iterator<Request> it = stream.iterator();
        Iterator<Request> it2 = stream2.iterator();
        return Stream.generate(() -> {
            Request request = (Request) atomicReference.getAndSet(null);
            if (request == null) {
                request = (Request) it.next();
            }
            Request request2 = (Request) atomicReference2.getAndSet(null);
            if (request2 == null) {
                request2 = (Request) it2.next();
            }
            if (request.time < request2.time) {
                atomicReference2.set(request2);
                return request;
            }
            atomicReference.set(request);
            return request2;
        });
    }

    private Stream<Request> joinAll(Collection<Stream<Request>> collection) {
        Iterator<Stream<Request>> it = collection.iterator();
        if (!it.hasNext()) {
            return Stream.empty();
        }
        Stream<Request> next = it.next();
        while (true) {
            Stream<Request> stream = next;
            if (!it.hasNext()) {
                return stream;
            }
            next = join(stream, it.next());
        }
    }
}
