/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.tracing;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.TransformationStage;
import org.apache.kafka.connect.runtime.tracing.ConnectorTracingException;
import org.apache.kafka.connect.runtime.tracing.TraceRecord;
import org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.runtime.tracing.TracerConfig;
import org.apache.kafka.connect.runtime.tracing.TracingContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;

@Confluent
public abstract class AbstractTracer
implements Tracer {
    protected final TracerConfig tracerConfig;
    protected final Producer<byte[], byte[]> traceProducer;
    protected final TopicAdmin admin;
    private final TracingContext tracingContext;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final List<TransformationStage<TraceRecord>> traceRecordTransformations;
    private final Logger log;
    private final Set<String> skipTopicsCreationSet = ConcurrentHashMap.newKeySet();
    private TraceRecordBuilder currentTraceRecordBuilder;

    public AbstractTracer(ConnectorTaskId connectorTaskId, TracerConfig tracerConfig, Producer<byte[], byte[]> traceProducer, TopicAdmin admin) {
        this.tracerConfig = tracerConfig;
        this.traceProducer = traceProducer;
        this.admin = admin;
        this.keyConverter = tracerConfig.keyConverter();
        this.valueConverter = tracerConfig.valueConverter();
        this.headerConverter = tracerConfig.headerConverter();
        this.traceRecordTransformations = Collections.unmodifiableList(tracerConfig.traceTransformations());
        this.tracingContext = new TracingContext(connectorTaskId, tracerConfig);
        this.log = new LogContext(String.format("[%s-tracer-%s] ", connectorTaskId.toString(), this.tracingContext().traceID())).logger(AbstractTracer.class);
    }

    private RecordHeaders convertHeaders(TraceRecord record) {
        Headers headers = record.headers();
        RecordHeaders result = new RecordHeaders();
        if (headers != null) {
            String topic = record.topic();
            for (Header header : headers) {
                String key = header.key();
                byte[] rawHeader = this.headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
                result.add(key, rawHeader);
            }
        }
        return result;
    }

    private ProducerRecord<byte[], byte[]> convertAndTransform(TraceRecord record, boolean doTransformations, boolean createTraceRecordForError) {
        this.log.trace("Begin transformation and conversion of TraceRecord {}", (Object)record);
        ProducerRecord traceProducerRecord = null;
        try {
            if (doTransformations) {
                for (TransformationStage<TraceRecord> transformation : this.traceRecordTransformations) {
                    record = transformation.apply(record);
                    if (record != null) continue;
                    return null;
                }
            }
            byte[] key = this.keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
            byte[] value = this.valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
            RecordHeaders headers = this.convertHeaders(record);
            this.log.trace("Successfully created ProducerRecord for traceRecord");
            traceProducerRecord = new ProducerRecord(record.topic(), record.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(record.timestamp()), (Object)key, (Object)value, (Iterable)headers);
        }
        catch (Exception e) {
            if (createTraceRecordForError) {
                this.log.debug("Conversion/transformation on trace record failed with an exception. Failed record can be logged by switching the log level to TRACE Creating new trace record for the encountered exception. Please check the trace configs ", (Throwable)e);
                traceProducerRecord = this.convertAndTransform(this.traceRecordBuilder().createTracingError(record, e), false, false);
            }
            throw new ConnectorTracingException("Connector Tracing failed to convert and transform record ", e);
        }
        return traceProducerRecord;
    }

    private void maybeCreateTopic(String topic) {
        try {
            if (this.skipTopicsCreationSet.contains(topic)) {
                return;
            }
            this.log.info("Attempting to create new or find existing trace topic: '{}'", (Object)topic);
            NewTopic newTopic = TopicAdmin.defineTopic(topic).partitions(this.tracerConfig.getInt("trace.records.topic.partition")).replicationFactor(this.tracerConfig.getShort("trace.records.topic.replication.factor")).config(this.tracerConfig.originalsWithPrefix("trace.records.trace.topic.config.")).build();
            if (this.admin.createOrFindTopic(newTopic)) {
                this.skipTopicsCreationSet.add(topic);
                this.log.info("Trace topic '{}' created/existed on broker, commencing message production", (Object)topic);
            }
        }
        catch (Exception e) {
            if (e.getCause() instanceof RetriableException) {
                this.log.debug("Creation/lookup of trace topic '{}' failed but will be retried in next iteration. Current trace message will be sent to broker assuming failure conditions may be resolved shortly.", (Object)topic, (Object)e);
            }
            this.log.warn("Creation/lookup of trace topic '{}' failed. Skipping creation and assuming broker side topic creation is enabled", (Object)topic, (Object)e);
            this.skipTopicsCreationSet.add(topic);
        }
    }

    public abstract TraceRecordBuilder newTraceRecordBuilder();

    @Override
    public Future<RecordMetadata> writeTraceRecord(TraceRecord traceRecord, Callback callback) {
        try {
            ProducerRecord<byte[], byte[]> producerRecord = this.convertAndTransform(traceRecord, true, true);
            if (producerRecord != null) {
                this.maybeCreateTopic(producerRecord.topic());
                return this.traceProducer.send(producerRecord, (meta, error) -> {
                    if (null == error) {
                        this.log.trace("Successfully sent trace record {}", (Object)meta);
                    } else {
                        this.log.trace("Error {} occurred while sending record {}", (Object)error, (Object)meta);
                    }
                    if (callback != null) {
                        callback.onCompletion(meta, error);
                    }
                });
            }
        }
        catch (RetriableException e) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Exception sending trace record {}", (Object)traceRecord, (Object)e);
            } else {
                this.log.error("Exception sending trace record. (Enable trace log level to see records.)", (Throwable)e);
            }
        }
        catch (ConnectorTracingException e) {
            throw e;
        }
        catch (Exception e) {
            throw new ConnectorTracingException("Failed to write trace record ", e);
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public TraceRecordBuilder traceRecordBuilder() {
        return this.currentTraceRecordBuilder;
    }

    @Override
    public List<TraceRecord> buildRecords() {
        List<TraceRecord> records = this.traceRecordBuilder().build();
        this.log.debug("Trace record build returned {} records", (Object)records.size());
        this.currentTraceRecordBuilder = this.newTraceRecordBuilder();
        return records;
    }

    @Override
    public void start() {
        this.log.info("Starting tracer for {}", (Object)this.tracingContext.connectorTaskId());
        this.currentTraceRecordBuilder = this.newTraceRecordBuilder();
    }

    @Override
    public final TracingContext tracingContext() {
        return this.tracingContext;
    }

    @Override
    public void close() {
        try {
            this.log.info("Closing producer and topic admin for connector tracing");
            if (null != this.traceRecordBuilder()) {
                this.buildAndWriteRecords();
            }
            this.traceProducer.close(Duration.ofSeconds(30L));
            this.admin.close(Duration.ofSeconds(30L));
            this.log.info("Successfully stopped tracer for {} ", (Object)this.tracingContext.connectorTaskId());
        }
        catch (Exception e) {
            throw new ConnectorTracingException("Failed to stop tracer for " + this.tracingContext.connectorTaskId(), e);
        }
    }

    @Override
    public TracerConfig tracerConfig() {
        return this.tracerConfig;
    }
}

