/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.slf4j.Logger;

public class RecordCollectorImpl
implements RecordCollector {
    private final Logger log;
    private final String logPrefix;
    private final Sensor skippedRecordsSensor;
    private final Producer<byte[], byte[]> producer;
    private final Map<TopicPartition, Long> offsets;
    private final ProductionExceptionHandler productionExceptionHandler;
    private static final String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; No more records will be sent and no more offsets will be recorded for this task.";
    private static final String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
    private static final String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
    private volatile KafkaException sendException;

    public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId, LogContext logContext, ProductionExceptionHandler productionExceptionHandler, Sensor skippedRecordsSensor) {
        this.producer = producer;
        this.offsets = new HashMap<TopicPartition, Long>();
        this.logPrefix = String.format("task [%s] ", streamTaskId);
        this.log = logContext.logger(this.getClass());
        this.productionExceptionHandler = productionExceptionHandler;
        this.skippedRecordsSensor = skippedRecordsSensor;
    }

    @Override
    public <K, V> void send(String topic, K key, V value, Headers headers, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
        Integer partition = null;
        if (partitioner != null) {
            List partitions = this.producer.partitionsFor(topic);
            if (partitions.size() > 0) {
                partition = partitioner.partition(topic, key, value, partitions.size());
            } else {
                throw new StreamsException("Could not get partition information for topic '" + topic + "'. This can happen if the topic does not exist.");
            }
        }
        this.send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer);
    }

    private boolean productionExceptionIsFatal(Exception exception) {
        boolean securityException = exception instanceof AuthenticationException || exception instanceof AuthorizationException || exception instanceof SecurityDisabledException;
        boolean communicationException = exception instanceof InvalidTopicException || exception instanceof UnknownServerException || exception instanceof SerializationException || exception instanceof OffsetMetadataTooLarge || exception instanceof IllegalStateException;
        return securityException || communicationException;
    }

    private <K, V> void recordSendError(K key, V value, Long timestamp, String topic, Exception exception) {
        String errorLogMessage = LOG_MESSAGE;
        String errorMessage = EXCEPTION_MESSAGE;
        if (exception instanceof RetriableException) {
            errorLogMessage = errorLogMessage + PARAMETER_HINT;
            errorMessage = errorMessage + PARAMETER_HINT;
        }
        this.log.error(errorLogMessage, new Object[]{key, value, timestamp, topic, exception.toString()});
        this.sendException = new StreamsException(String.format(errorMessage, this.logPrefix, "an error caught", key, value, timestamp, topic, exception.toString()), exception);
    }

    @Override
    public <K, V> void send(final String topic, final K key, final V value, Headers headers, final Integer partition, final Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.checkForException();
        byte[] keyBytes = keySerializer.serialize(topic, key);
        byte[] valBytes = valueSerializer.serialize(topic, value);
        final ProducerRecord serializedRecord = new ProducerRecord(topic, partition, timestamp, (Object)keyBytes, (Object)valBytes, (Iterable)headers);
        try {
            this.producer.send(serializedRecord, new Callback(){

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        if (RecordCollectorImpl.this.sendException != null) {
                            return;
                        }
                        TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
                        RecordCollectorImpl.this.offsets.put(tp, metadata.offset());
                    } else if (RecordCollectorImpl.this.sendException == null) {
                        if (exception instanceof ProducerFencedException) {
                            RecordCollectorImpl.this.log.warn(RecordCollectorImpl.LOG_MESSAGE, new Object[]{key, value, timestamp, topic, exception.toString()});
                            RecordCollectorImpl.this.sendException = (KafkaException)((Object)new ProducerFencedException(String.format(RecordCollectorImpl.EXCEPTION_MESSAGE, RecordCollectorImpl.this.logPrefix, "producer got fenced", key, value, timestamp, topic, exception.toString())));
                        } else if (RecordCollectorImpl.this.productionExceptionIsFatal(exception)) {
                            RecordCollectorImpl.this.recordSendError(key, value, timestamp, topic, exception);
                        } else if (RecordCollectorImpl.this.productionExceptionHandler.handle((ProducerRecord<byte[], byte[]>)serializedRecord, exception) == ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL) {
                            RecordCollectorImpl.this.recordSendError(key, value, timestamp, topic, exception);
                        } else {
                            RecordCollectorImpl.this.log.warn("Error sending records (key=[{}] value=[{}] timestamp=[{}]) to topic=[{}] and partition=[{}]; The exception handler chose to CONTINUE processing in spite of this error.", new Object[]{key, value, timestamp, topic, partition, exception});
                            RecordCollectorImpl.this.skippedRecordsSensor.record();
                        }
                    }
                }
            });
        }
        catch (TimeoutException e) {
            this.log.error("Timeout exception caught when sending record to topic {}. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. You can increase producer parameter `max.block.ms` to increase this timeout.", (Object)topic);
            throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", this.logPrefix, topic));
        }
        catch (Exception uncaughtException) {
            throw new StreamsException(String.format(EXCEPTION_MESSAGE, this.logPrefix, "an error caught", key, value, timestamp, topic, uncaughtException.toString()), uncaughtException);
        }
    }

    private void checkForException() {
        if (this.sendException != null) {
            throw this.sendException;
        }
    }

    @Override
    public void flush() {
        this.log.debug("Flushing producer");
        this.producer.flush();
        this.checkForException();
    }

    @Override
    public void close() {
        this.log.debug("Closing producer");
        this.producer.close();
        this.checkForException();
    }

    @Override
    public Map<TopicPartition, Long> offsets() {
        return this.offsets;
    }

    Producer<byte[], byte[]> producer() {
        return this.producer;
    }
}

