/*
 * Decompiled with CFR 0.152.
 */
package com.github.loki4j.client.pipeline;

import com.github.loki4j.client.batch.Batcher;
import com.github.loki4j.client.batch.BinaryBatch;
import com.github.loki4j.client.batch.ByteBufferQueue;
import com.github.loki4j.client.batch.LogRecord;
import com.github.loki4j.client.batch.LogRecordBatch;
import com.github.loki4j.client.batch.LogRecordStream;
import com.github.loki4j.client.http.Loki4jHttpClient;
import com.github.loki4j.client.http.LokiResponse;
import com.github.loki4j.client.pipeline.Loki4jMetrics;
import com.github.loki4j.client.pipeline.PipelineConfig;
import com.github.loki4j.client.util.ByteBufferFactory;
import com.github.loki4j.client.util.Loki4jLogger;
import com.github.loki4j.client.util.Loki4jThreadFactory;
import com.github.loki4j.client.writer.Writer;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

public final class DefaultPipeline {
    private static final Comparator<LogRecord> compareByTime = (e1, e2) -> {
        int tsCmp = Long.compare(e1.timestampMs, e2.timestampMs);
        return tsCmp == 0 ? Integer.compare(e1.nanos, e2.nanos) : tsCmp;
    };
    private static final Comparator<LogRecord> compareByStream = (e1, e2) -> Long.compare(e1.stream.id, e2.stream.id);
    private final long PARK_NS = TimeUnit.MILLISECONDS.toNanos(25L);
    private final ConcurrentLinkedQueue<LogRecord> buffer = new ConcurrentLinkedQueue();
    private final ByteBufferQueue sendQueue;
    private final Batcher batcher;
    private final Optional<Comparator<LogRecord>> recordComparator;
    private final Writer writer;
    private final Loki4jHttpClient httpClient;
    private final Loki4jMetrics metrics;
    private final Loki4jLogger log;
    private final boolean drainOnStop;
    private volatile boolean started = false;
    private AtomicBoolean acceptNewEvents = new AtomicBoolean(true);
    private AtomicBoolean drainRequested = new AtomicBoolean(false);
    private AtomicLong lastSendTimeMs = new AtomicLong(System.currentTimeMillis());
    private AtomicLong unsentEvents = new AtomicLong(0L);
    private ScheduledExecutorService scheduler;
    private ExecutorService encoderThreadPool;
    private ExecutorService senderThreadPool;
    private ScheduledFuture<?> drainScheduledFuture;

    public DefaultPipeline(PipelineConfig conf) {
        Optional<Object> logRecordComparator = Optional.empty();
        if (conf.staticLabels) {
            if (conf.sortByTime) {
                logRecordComparator = Optional.of(compareByTime);
            }
        } else {
            logRecordComparator = Optional.of(conf.sortByTime ? compareByStream.thenComparing(compareByTime) : compareByStream);
        }
        ByteBufferFactory bufferFactory = new ByteBufferFactory(conf.useDirectBuffers);
        this.batcher = new Batcher(conf.batchMaxItems, conf.batchMaxBytes, conf.batchTimeoutMs);
        this.recordComparator = logRecordComparator;
        this.writer = conf.writerFactory.factory.apply(conf.batchMaxBytes, bufferFactory);
        this.sendQueue = new ByteBufferQueue(conf.sendQueueMaxBytes, bufferFactory);
        this.httpClient = conf.httpClientFactory.apply(conf.httpConfig);
        this.drainOnStop = conf.drainOnStop;
        this.log = conf.internalLoggingFactory.apply(this);
        this.metrics = conf.metricsEnabled ? new Loki4jMetrics(conf.name) : null;
    }

    public void start() {
        this.log.info("Pipeline is starting...", new Object[0]);
        this.started = true;
        this.senderThreadPool = Executors.newFixedThreadPool(1, new Loki4jThreadFactory("loki4j-sender"));
        this.senderThreadPool.execute(() -> this.runSendLoop());
        this.encoderThreadPool = Executors.newFixedThreadPool(1, new Loki4jThreadFactory("loki4j-encoder"));
        this.encoderThreadPool.execute(() -> this.runEncodeLoop());
        this.scheduler = Executors.newScheduledThreadPool(1, new Loki4jThreadFactory("loki4j-scheduler"));
        this.drainScheduledFuture = this.scheduler.scheduleAtFixedRate(() -> this.drain(), 100L, 100L, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this.drainScheduledFuture.cancel(false);
        if (this.drainOnStop) {
            this.log.info("Pipeline is draining...", new Object[0]);
            this.waitSendQueueLessThan(this.batcher.getCapacity(), Long.MAX_VALUE);
            this.lastSendTimeMs.set(0L);
            this.drain();
            this.waitSendQueueIsEmpty(Long.MAX_VALUE);
            this.log.info("Drain completed", new Object[0]);
        }
        this.started = false;
        this.scheduler.shutdown();
        this.encoderThreadPool.shutdown();
        this.senderThreadPool.shutdown();
        try {
            this.httpClient.close();
        }
        catch (Exception e) {
            this.log.error(e, "Error while closing HttpClient", new Object[0]);
        }
    }

    public boolean append(long timestamp, int nanos, Supplier<LogRecordStream> stream, Supplier<String> message) {
        long startedNs = System.nanoTime();
        boolean accepted = false;
        if (this.acceptNewEvents.get()) {
            LogRecord record = LogRecord.create(timestamp, nanos, stream.get(), message.get());
            if (this.batcher.validateLogRecordSize(record)) {
                this.buffer.offer(record);
                this.unsentEvents.incrementAndGet();
                accepted = true;
            } else {
                this.log.warn("Dropping the record that exceeds max batch size: %s", record);
            }
        }
        if (this.metrics != null) {
            this.metrics.eventAppended(startedNs, !accepted);
        }
        return accepted;
    }

    private void drain() {
        this.drainRequested.set(true);
        this.log.trace("drain planned", new Object[0]);
    }

    private void runEncodeLoop() {
        LogRecordBatch batch = new LogRecordBatch(this.batcher.getCapacity());
        while (this.started) {
            try {
                this.encodeStep(batch);
            }
            catch (InterruptedException e) {
                this.stop();
            }
        }
    }

    private void runSendLoop() {
        while (this.started) {
            try {
                this.sendStep();
            }
            catch (InterruptedException e) {
                this.stop();
            }
        }
    }

    private void encodeStep(LogRecordBatch batch) throws InterruptedException {
        while (this.started && this.buffer.isEmpty() && !this.drainRequested.get()) {
            LockSupport.parkNanos(this, this.PARK_NS);
        }
        if (!this.started) {
            return;
        }
        this.log.trace("check encode actions", new Object[0]);
        LogRecord record = this.buffer.peek();
        while (record != null && batch.isEmpty()) {
            this.batcher.checkSizeBeforeAdd(record, batch);
            if (batch.isEmpty()) {
                this.batcher.add((LogRecord)this.buffer.remove(), batch);
            }
            if (!batch.isEmpty()) continue;
            record = this.buffer.peek();
        }
        if (batch.isEmpty() && this.drainRequested.get()) {
            this.batcher.drain(this.lastSendTimeMs.get(), batch);
        }
        this.drainRequested.set(false);
        if (batch.isEmpty()) {
            return;
        }
        this.writeBatch(batch, this.writer);
        if (this.writer.isEmpty()) {
            return;
        }
        while (this.started && !this.sendQueue.offer(batch.batchId(), batch.size(), this.writer.size(), b -> this.writer.toByteBuffer((ByteBuffer)b))) {
            this.acceptNewEvents.set(false);
            LockSupport.parkNanos(this, this.PARK_NS);
        }
        batch.clear();
        this.acceptNewEvents.set(true);
    }

    private void writeBatch(LogRecordBatch batch, Writer writer) {
        long startedNs = System.nanoTime();
        this.recordComparator.ifPresent(cmp -> batch.sort((Comparator<LogRecord>)cmp));
        try {
            writer.serializeBatch(batch);
            this.log.info(">>> Batch %s converted to %,d bytes", batch, writer.size());
            if (this.metrics != null) {
                this.metrics.batchEncoded(startedNs, writer.size());
            }
        }
        catch (Exception e) {
            this.log.error(e, "Error occurred while serializing batch %s", batch);
            writer.reset();
        }
    }

    private void sendStep() throws InterruptedException {
        BinaryBatch batch = this.sendQueue.borrowBuffer();
        while (this.started && batch == null) {
            LockSupport.parkNanos(this, this.PARK_NS);
            batch = this.sendQueue.borrowBuffer();
        }
        if (!this.started) {
            return;
        }
        try {
            this.sendBatch(batch);
            this.lastSendTimeMs.set(System.currentTimeMillis());
            this.log.trace("sent items: %s", batch.sizeItems);
        }
        finally {
            this.unsentEvents.addAndGet(-batch.sizeItems);
            this.sendQueue.returnBuffer(batch);
        }
    }

    private LokiResponse sendBatch(BinaryBatch batch) {
        long startedNs = System.nanoTime();
        LokiResponse r = null;
        Exception e = null;
        try {
            r = this.httpClient.send(batch.data);
        }
        catch (Exception re) {
            e = re;
        }
        if (e != null) {
            this.log.error(e, "Error while sending Batch %s to Loki (%s)", batch, this.httpClient.getConfig().pushUrl);
        } else if (r.status < 200 || r.status > 299) {
            this.log.error("Loki responded with non-success status %s on batch %s. Error: %s", r.status, batch, r.body);
        } else {
            this.log.info("<<< Batch %s: Loki responded with status %s", batch, r.status);
        }
        if (this.metrics != null) {
            this.metrics.batchSent(startedNs, batch.sizeBytes, e != null || r.status > 299);
        }
        return r;
    }

    public void waitSendQueueIsEmpty(long timeoutMs) {
        this.waitSendQueueLessThan(1, timeoutMs);
    }

    void waitSendQueueLessThan(int size, long timeoutMs) {
        long elapsedNs;
        long timeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
        for (elapsedNs = 0L; this.started && this.unsentEvents.get() >= (long)size && elapsedNs < timeoutNs; elapsedNs += this.PARK_NS) {
            LockSupport.parkNanos(this.PARK_NS);
        }
        this.log.trace("wait send queue: started=%s, buffer(%s)>=%s, %s ms %s elapsed", this.started, this.unsentEvents.get(), size, timeoutMs, elapsedNs < timeoutNs ? "not" : "");
        if (elapsedNs >= timeoutNs) {
            throw new RuntimeException("Not completed within timeout " + timeoutMs + " ms");
        }
    }
}

