package io.confluent.controlcenter.serialization;

import com.google.common.base.Charsets;
import com.sun.jna.platform.win32.COM.tlb.imp.TlbBase;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.util.ConfigUtils;
import io.confluent.controlcenter.util.InjectorFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import kafka.common.MessageReader;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.SerializationException;
import org.joda.time.DateTime;

/* loaded from: input_file:io/confluent/controlcenter/serialization/UberReader.class */
public class UberReader implements MessageReader {
    private ControlCenterConfig config;
    private String topic;
    private boolean parseTopic;
    private boolean parsePartition;
    private boolean parseTs;
    private boolean parseKey;
    private String fieldSeparator;
    private BufferedReader reader;
    private int lineNumber = 0;
    private TopicStoreMaster topicStoreMaster;

    public UberReader() {
    }

    public UberReader(ControlCenterConfig controlCenterConfig) {
        this.config = controlCenterConfig;
    }

    @Override // kafka.common.MessageReader
    public void init(InputStream inputStream, Properties properties) {
        this.topic = properties.getProperty("topic");
        this.parseTopic = getBoolean(properties, "parse.topic", this.topic == null);
        this.parsePartition = getBoolean(properties, "parse.partition", true);
        this.parseTs = getBoolean(properties, "parse.timestamp", true);
        this.parseKey = getBoolean(properties, "parse.key", true);
        this.fieldSeparator = properties.getProperty("field.separator", TlbBase.TAB);
        if (this.topic == null && !this.parseTopic) {
            throw new SerializationException("Either the topic or parse.topic properties must be set");
        }
        this.reader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8));
        try {
            if (this.config == null) {
                this.config = getControlCenterConfig(properties);
            }
            this.topicStoreMaster = (TopicStoreMaster) InjectorFactory.createInjectorForScripts(this.config).getInstance(TopicStoreMaster.class);
        } catch (Exception e) {
            throw new SerializationException("Failed to load config " + e.getMessage(), e);
        }
    }

    private static ControlCenterConfig getControlCenterConfig(Properties properties) {
        Properties propsFromFile = ConfigUtils.getPropsFromFile(properties.getProperty("config"));
        propsFromFile.setProperty(ControlCenterConfig.CONTROL_CENTER_MODE_ENABLED, "all");
        return new ControlCenterConfig(propsFromFile);
    }

    private boolean getBoolean(Properties properties, String str, boolean z) {
        return properties.containsKey(str) ? Boolean.parseBoolean(properties.getProperty(str).trim()) : z;
    }

    private ProducerRecord<byte[], byte[]> parseLine(String str) {
        String str2 = this.topic;
        Integer num = null;
        Long l = null;
        byte[] bArr = null;
        int i = 0;
        if (this.parseTopic) {
            int indexOf = str.indexOf(this.fieldSeparator, 0);
            if (indexOf < 0) {
                throw new KafkaException("No topic found in line " + str);
            }
            String nullableSubstring = nullableSubstring(str, 0, indexOf);
            if (nullableSubstring != null) {
                str2 = nullableSubstring;
            }
            i = indexOf + this.fieldSeparator.length();
        }
        TopicStoreMaster.SerdePair serdesForTopic = this.topicStoreMaster.getSerdesForTopic(str2);
        if (serdesForTopic == null) {
            throw new SerializationException("No serde found for topic: " + str2);
        }
        if (this.parsePartition) {
            int indexOf2 = str.indexOf(this.fieldSeparator, i);
            if (indexOf2 < 0) {
                throw new KafkaException("No partition found in line " + str);
            }
            String nullableSubstring2 = nullableSubstring(str, i, indexOf2);
            if (nullableSubstring2 != null) {
                num = Integer.valueOf(Integer.parseInt(nullableSubstring2));
            }
            i = indexOf2 + this.fieldSeparator.length();
        }
        if (this.parseTs) {
            int indexOf3 = str.indexOf(this.fieldSeparator, i);
            if (indexOf3 < 0) {
                throw new KafkaException("No timestamp found in line " + str);
            }
            String nullableSubstring3 = nullableSubstring(str, i, indexOf3);
            if (nullableSubstring3 != null) {
                l = Long.valueOf(DateTime.parse(nullableSubstring3).getMillis());
            }
            i = indexOf3 + this.fieldSeparator.length();
        }
        if (this.parseKey) {
            int indexOf4 = str.indexOf(this.fieldSeparator, i);
            if (indexOf4 < 0) {
                throw new KafkaException("No key found in line " + str);
            }
            String nullableSubstring4 = nullableSubstring(str, i, indexOf4);
            if (nullableSubstring4 != null) {
                bArr = serdesForTopic.keySerde.fromJson(nullableSubstring4);
            }
            i = indexOf4 + this.fieldSeparator.length();
        }
        return new ProducerRecord<>(str2, num, l, bArr, serdesForTopic.valueSerde.fromJson(str.substring(i)));
    }

    private String nullableSubstring(String str, int i, int i2) {
        String substring = str.substring(i, i2);
        if (substring.toLowerCase().equals("null")) {
            return null;
        }
        return substring;
    }

    @Override // kafka.common.MessageReader
    public ProducerRecord<byte[], byte[]> readMessage() {
        while (true) {
            try {
                this.lineNumber++;
                String readLine = this.reader.readLine();
                if (readLine == null) {
                    return null;
                }
                if (!readLine.startsWith("#") && readLine.trim().length() != 0) {
                    return parseLine(readLine);
                }
            } catch (Exception e) {
                throw new KafkaException(String.format("Error line %s: %s", Integer.valueOf(this.lineNumber), e.getMessage()), e);
            }
        }
    }

    @Override // kafka.common.MessageReader
    public void close() {
    }
}
