package org.apache.kafka.test;

import java.time.Duration;
import java.util.ArrayList;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.junit.Assert;

/* loaded from: input_file:org/apache/kafka/test/MockProcessor.class */
public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
    public final ArrayList<String> processed;
    public final ArrayList<K> processedKeys;
    public final ArrayList<V> processedValues;
    public final ArrayList<Long> punctuatedStreamTime;
    public final ArrayList<Long> punctuatedSystemTime;
    public Cancellable scheduleCancellable;
    private final PunctuationType punctuationType;
    private final long scheduleInterval;
    private boolean commitRequested;

    public MockProcessor(PunctuationType punctuationType, long j) {
        this.processed = new ArrayList<>();
        this.processedKeys = new ArrayList<>();
        this.processedValues = new ArrayList<>();
        this.punctuatedStreamTime = new ArrayList<>();
        this.punctuatedSystemTime = new ArrayList<>();
        this.commitRequested = false;
        this.punctuationType = punctuationType;
        this.scheduleInterval = j;
    }

    public MockProcessor() {
        this(PunctuationType.STREAM_TIME, -1L);
    }

    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        if (this.scheduleInterval > 0) {
            this.scheduleCancellable = processorContext.schedule(Duration.ofMillis(this.scheduleInterval), this.punctuationType, j -> {
                if (this.punctuationType == PunctuationType.STREAM_TIME) {
                    Assert.assertEquals(j, context().timestamp());
                }
                Assert.assertEquals(-1L, context().partition());
                Assert.assertEquals(-1L, context().offset());
                (this.punctuationType == PunctuationType.STREAM_TIME ? this.punctuatedStreamTime : this.punctuatedSystemTime).add(Long.valueOf(j));
            });
        }
    }

    public void process(K k, V v) {
        this.processedKeys.add(k);
        this.processedValues.add(v);
        this.processed.add((k == null ? "null" : k) + ":" + (v == null ? "null" : v));
        if (this.commitRequested) {
            context().commit();
            this.commitRequested = false;
        }
    }

    public void checkAndClearProcessResult(String... strArr) {
        Assert.assertEquals("the number of outputs:" + this.processed, strArr.length, this.processed.size());
        for (int i = 0; i < strArr.length; i++) {
            Assert.assertEquals("output[" + i + "]:", strArr[i], this.processed.get(i));
        }
        this.processed.clear();
    }

    public void requestCommit() {
        this.commitRequested = true;
    }

    public void checkEmptyAndClearProcessResult() {
        Assert.assertEquals("the number of outputs:", 0L, this.processed.size());
        this.processed.clear();
    }

    public void checkAndClearPunctuateResult(PunctuationType punctuationType, long... jArr) {
        ArrayList<Long> arrayList = punctuationType == PunctuationType.STREAM_TIME ? this.punctuatedStreamTime : this.punctuatedSystemTime;
        Assert.assertEquals("the number of outputs:", jArr.length, arrayList.size());
        for (int i = 0; i < jArr.length; i++) {
            Assert.assertEquals("output[" + i + "]:", jArr[i], arrayList.get(i).longValue());
        }
        this.processed.clear();
    }
}
