/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.serialization;

import com.google.common.base.Charsets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.ControlCenterConfigModule;
import io.confluent.controlcenter.serialization.SerializationModule;
import io.confluent.controlcenter.streams.StreamsConfigModule;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import kafka.common.MessageReader;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.SerializationException;
import org.joda.time.DateTime;

public class UberReader
implements MessageReader {
    private ControlCenterConfig config;
    private String topic;
    private boolean parseTopic;
    private boolean parsePartition;
    private boolean parseTs;
    private boolean parseKey;
    private String fieldSeparator;
    private BufferedReader reader;
    private int lineNumber = 0;
    private TopicStoreMaster topicStoreMaster;

    public UberReader() {
    }

    public UberReader(ControlCenterConfig config) {
        this.config = config;
    }

    public void init(InputStream inputStream, Properties props) {
        this.topic = props.getProperty("topic");
        this.parseTopic = this.getBoolean(props, "parse.topic", this.topic == null);
        this.parsePartition = this.getBoolean(props, "parse.partition", true);
        this.parseTs = this.getBoolean(props, "parse.timestamp", true);
        this.parseKey = this.getBoolean(props, "parse.key", true);
        this.fieldSeparator = props.getProperty("field.separator", "\t");
        if (this.topic == null && !this.parseTopic) {
            throw new SerializationException("Either the topic or parse.topic properties must be set");
        }
        this.reader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8));
        String configFile = props.getProperty("config");
        try {
            if (this.config == null) {
                this.config = new ControlCenterConfig(configFile);
            }
            Injector injector = Guice.createInjector((Module[])new Module[]{new ControlCenterConfigModule(this.config), new StreamsConfigModule(), new SerializationModule(), new TopicStoreModule()});
            this.topicStoreMaster = (TopicStoreMaster)injector.getInstance(TopicStoreMaster.class);
        }
        catch (Exception e) {
            throw new SerializationException("Failed to load config " + e.getMessage(), (Throwable)e);
        }
    }

    private boolean getBoolean(Properties props, String name, boolean defaultValue) {
        if (props.containsKey(name)) {
            return Boolean.parseBoolean(props.getProperty(name).trim());
        }
        return defaultValue;
    }

    private ProducerRecord<byte[], byte[]> parseLine(String line) {
        TopicStoreMaster.SerdePair serdes;
        String topicName = this.topic;
        Integer partition = null;
        Long timestamp = null;
        byte[] keyBytes = null;
        int readIdx = 0;
        if (this.parseTopic) {
            int topicEndIdx = line.indexOf(this.fieldSeparator, readIdx);
            if (topicEndIdx < 0) {
                throw new KafkaException("No topic found in line " + line);
            }
            String topicField = this.nullableSubstring(line, readIdx, topicEndIdx);
            if (topicField != null) {
                topicName = topicField;
            }
            readIdx = topicEndIdx + this.fieldSeparator.length();
        }
        if ((serdes = this.topicStoreMaster.getSerdesForTopic(topicName)) == null) {
            throw new SerializationException("No serde found for topic: " + topicName);
        }
        if (this.parsePartition) {
            int partitionEndIdx = line.indexOf(this.fieldSeparator, readIdx);
            if (partitionEndIdx < 0) {
                throw new KafkaException("No partition found in line " + line);
            }
            String partitionField = this.nullableSubstring(line, readIdx, partitionEndIdx);
            if (partitionField != null) {
                partition = Integer.parseInt(partitionField);
            }
            readIdx = partitionEndIdx + this.fieldSeparator.length();
        }
        if (this.parseTs) {
            int tsEndIdx = line.indexOf(this.fieldSeparator, readIdx);
            if (tsEndIdx < 0) {
                throw new KafkaException("No timestamp found in line " + line);
            }
            String timestampField = this.nullableSubstring(line, readIdx, tsEndIdx);
            if (timestampField != null) {
                timestamp = DateTime.parse((String)timestampField).getMillis();
            }
            readIdx = tsEndIdx + this.fieldSeparator.length();
        }
        if (this.parseKey) {
            int keyEndIdx = line.indexOf(this.fieldSeparator, readIdx);
            if (keyEndIdx < 0) {
                throw new KafkaException("No key found in line " + line);
            }
            String keyField = this.nullableSubstring(line, readIdx, keyEndIdx);
            if (keyField != null) {
                keyBytes = serdes.keySerde.fromJson(keyField);
            }
            readIdx = keyEndIdx + this.fieldSeparator.length();
        }
        byte[] valBytes = serdes.valueSerde.fromJson(line.substring(readIdx));
        return new ProducerRecord(topicName, partition, timestamp, keyBytes, (Object)valBytes);
    }

    private String nullableSubstring(String string, int startIdx, int endIdx) {
        String result = string.substring(startIdx, endIdx);
        if (result.toLowerCase().equals("null")) {
            return null;
        }
        return result;
    }

    public ProducerRecord<byte[], byte[]> readMessage() {
        try {
            String line;
            do {
                ++this.lineNumber;
                line = this.reader.readLine();
                if (line != null) continue;
                return null;
            } while (line.startsWith("#") || line.trim().length() == 0);
            return this.parseLine(line);
        }
        catch (Exception e) {
            throw new KafkaException(String.format("Error line %s: %s", this.lineNumber, e.getMessage()), (Throwable)e);
        }
    }

    public void close() {
    }
}

