/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordCollector {
    private static final int MAX_SEND_ATTEMPTS = 3;
    private static final long SEND_RETRY_BACKOFF = 100L;
    private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
    private final Producer<byte[], byte[]> producer;
    private final Map<TopicPartition, Long> offsets;
    private final String logPrefix;

    public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) {
        this.producer = producer;
        this.offsets = new HashMap<TopicPartition, Long>();
        this.logPrefix = String.format("task [%s]", streamTaskId);
    }

    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.send(record, keySerializer, valueSerializer, null);
    }

    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<K, V> partitioner) {
        List partitions;
        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
        Integer partition = record.partition();
        if (partition == null && partitioner != null && (partitions = this.producer.partitionsFor(record.topic())) != null && partitions.size() > 0) {
            partition = partitioner.partition(record.key(), record.value(), partitions.size());
        }
        ProducerRecord serializedRecord = new ProducerRecord(record.topic(), partition, record.timestamp(), (Object)keyBytes, (Object)valBytes);
        final String topic = serializedRecord.topic();
        for (int attempt = 1; attempt <= 3; ++attempt) {
            try {
                this.producer.send(serializedRecord, new Callback(){

                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
                            RecordCollector.this.offsets.put(tp, metadata.offset());
                        } else {
                            log.error("{} Error sending record to topic {}", new Object[]{RecordCollector.this.logPrefix, topic, exception});
                        }
                    }
                });
                return;
            }
            catch (TimeoutException e) {
                if (attempt == 3) {
                    throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", this.logPrefix, topic, attempt));
                }
                log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", new Object[]{this.logPrefix, topic, attempt});
                Utils.sleep((long)100L);
                continue;
            }
        }
    }

    public void flush() {
        log.debug("{} Flushing producer", (Object)this.logPrefix);
        this.producer.flush();
    }

    public void close() {
        this.producer.close();
    }

    Map<TopicPartition, Long> offsets() {
        return this.offsets;
    }

    public static interface Supplier {
        public RecordCollector recordCollector();
    }
}

