package io.confluent.ksql.api.server;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlPreconditions;
import io.confluent.ksql.util.Pair;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.utils.Time;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/api/server/SlidingWindowRateLimiter.class */
public class SlidingWindowRateLimiter {
    private static final Logger LOG = LogManager.getLogger(SlidingWindowRateLimiter.class);
    private static long NUM_BYTES_IN_ONE_MEGABYTE = 1048576;
    private final Queue<Pair<Long, Long>> responseSizesLog;
    private final long throttleLimit;
    private final long slidingWindowSizeMs;
    private final Sensor rejectSensor;
    private volatile long numBytesInWindow;

    public SlidingWindowRateLimiter(int i, long j, String str, Metrics metrics, Map<String, String> map) {
        KsqlPreconditions.checkArgument(i >= 0, "Query bandwidth limit can't be negative.");
        KsqlPreconditions.checkArgument(j >= 0, "Query throttle window size can't be negative");
        this.throttleLimit = i * NUM_BYTES_IN_ONE_MEGABYTE;
        this.slidingWindowSizeMs = j;
        this.responseSizesLog = new LinkedList();
        this.numBytesInWindow = 0L;
        metrics.addMetric(new MetricName(str + "-bandwidth-limit-remaining", "_confluent-ksql-limits", "The current value of the bandwidth limiter", map), (metricConfig, j2) -> {
            return this.throttleLimit - this.numBytesInWindow;
        });
        this.rejectSensor = metrics.sensor("bandwidth-limit-rejects");
        this.rejectSensor.add(new MetricName(str + "-bandwidth-limit-reject-count", "_confluent-ksql-limits", "The number of requests rejected by this limiter", map), new CumulativeCount());
    }

    public synchronized void allow(KsqlConstants.KsqlQueryType ksqlQueryType) throws KsqlException {
        allow(ksqlQueryType, Time.SYSTEM.milliseconds());
    }

    @VisibleForTesting
    protected synchronized void allow(KsqlConstants.KsqlQueryType ksqlQueryType, long j) throws KsqlException {
        KsqlPreconditions.checkArgument(j >= 0, "Timestamp can't be negative.");
        while (!this.responseSizesLog.isEmpty() && j - ((Long) this.responseSizesLog.peek().left).longValue() >= this.slidingWindowSizeMs) {
            this.numBytesInWindow -= ((Long) this.responseSizesLog.poll().right).longValue();
        }
        if (this.numBytesInWindow > this.throttleLimit) {
            Logger logger = LOG;
            long j2 = this.throttleLimit;
            long j3 = this.numBytesInWindow;
            logger.warn("Hit bandwidth rate limit of " + j2 + "B with use of " + logger + "B");
            this.rejectSensor.record();
            throw new KsqlApiException("Host is at bandwidth rate limit for " + ksqlQueryType.toString().toLowerCase() + " queries.", Errors.ERROR_CODE_BAD_REQUEST);
        }
    }

    public synchronized void add(long j) {
        add(Time.SYSTEM.milliseconds(), j);
    }

    @VisibleForTesting
    protected synchronized void add(long j, long j2) {
        KsqlPreconditions.checkArgument(j >= 0, "Timestamp can't be negative.");
        KsqlPreconditions.checkArgument(j2 >= 0, "Response size can't be negative.");
        this.responseSizesLog.add(new Pair<>(Long.valueOf(j), Long.valueOf(j2)));
        this.numBytesInWindow += j2;
    }
}
