/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SinkNodeTest {
    private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
    private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes((String)"anyName", Bytes.class, Bytes.class);
    private final RecordCollector recordCollector = new RecordCollectorImpl(null, new LogContext("sinknode-test "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
    private final InternalMockProcessorContext context = new InternalMockProcessorContext(this.anyStateSerde, this.recordCollector);
    private final SinkNode sink = new SinkNode("anyNodeName", (TopicNameExtractor)new StaticTopicNameExtractor("any-output-topic"), this.anySerializer, this.anySerializer, null);

    @Before
    public void before() {
        this.recordCollector.init((Producer)new MockProducer(true, this.anySerializer, this.anySerializer));
        this.sink.init((InternalProcessorContext)this.context);
    }

    @Test
    public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
        Bytes anyKey = new Bytes("any key".getBytes());
        Bytes anyValue = new Bytes("any value".getBytes());
        this.context.setTime(-1L);
        try {
            this.sink.process((Object)anyKey, (Object)anyValue);
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowStreamsExceptionOnKeyValueTypeSerializerMismatch() {
        String keyOfDifferentTypeThanSerializer = "key with different type";
        String valueOfDifferentTypeThanSerializer = "value with different type";
        this.context.setTime(0L);
        try {
            this.sink.process((Object)"key with different type", (Object)"value with different type");
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(ClassCastException.class));
        }
    }

    @Test
    public void shouldHandleNullKeysWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
        String invalidValueToTriggerSerializerMismatch = "";
        this.context.setTime(1L);
        try {
            this.sink.process(null, (Object)"");
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(ClassCastException.class));
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"unknown because key is null"));
        }
    }

    @Test
    public void shouldHandleNullValuesWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
        String invalidKeyToTriggerSerializerMismatch = "";
        this.context.setTime(1L);
        try {
            this.sink.process((Object)"", null);
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(ClassCastException.class));
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"unknown because value is null"));
        }
    }
}

