package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.RecordCollector;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/SinkNode.class */
public class SinkNode<K, V> extends ProcessorNode<K, V> {
    private Serializer<K> keySerializer;
    private Serializer<V> valSerializer;
    private final TopicNameExtractor<K, V> topicExtractor;
    private final StreamPartitioner<? super K, ? super V> partitioner;
    private InternalProcessorContext context;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SinkNode(String str, TopicNameExtractor<K, V> topicNameExtractor, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<? super K, ? super V> streamPartitioner) {
        super(str);
        this.topicExtractor = topicNameExtractor;
        this.keySerializer = serializer;
        this.valSerializer = serializer2;
        this.partitioner = streamPartitioner;
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public void addChild(ProcessorNode<?, ?> processorNode) {
        throw new UnsupportedOperationException("sink node does not allow addChild");
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public void init(InternalProcessorContext internalProcessorContext) {
        super.init(internalProcessorContext);
        this.context = internalProcessorContext;
        if (this.keySerializer == null) {
            this.keySerializer = internalProcessorContext.keySerde().serializer();
        }
        if (this.valSerializer == null) {
            this.valSerializer = internalProcessorContext.valueSerde().serializer();
        }
        if (this.valSerializer instanceof WrappingNullableSerializer) {
            ((WrappingNullableSerializer) this.valSerializer).setIfUnset(internalProcessorContext.keySerde().serializer(), internalProcessorContext.valueSerde().serializer());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public void process(K k, V v) {
        RecordCollector recordCollector = ((RecordCollector.Supplier) this.context).recordCollector();
        long timestamp = this.context.timestamp();
        if (timestamp < 0) {
            throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + k + ":" + v + ">.");
        }
        recordCollector.send(this.topicExtractor.extract(k, v, this.context.recordContext()), (String) k, (K) v, this.context.headers(), Long.valueOf(timestamp), (Serializer<String>) this.keySerializer, (Serializer<K>) this.valSerializer, (StreamPartitioner<? super String, ? super K>) this.partitioner);
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public String toString() {
        return toString("");
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public String toString(String str) {
        StringBuilder sb = new StringBuilder(super.toString(str));
        sb.append(str).append("\ttopic:\t\t");
        sb.append(this.topicExtractor);
        sb.append("\n");
        return sb.toString();
    }
}
