/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.tools;

import com.google.common.base.Charsets;
import io.confluent.common.config.AbstractConfig;
import io.confluent.controlcenter.KafkaHelper;
import io.confluent.controlcenter.serialization.UberFormatter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataExporter {
    private static final String GROUP_ID = "control-center-export-cg";
    private static final String CLIENT_ID = "control-center-export-client";
    private static final long PROGRESS_UPDATE_MS = 5000L;
    private static final Logger log = LoggerFactory.getLogger(DataExporter.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        CommandLine cmd;
        Options options = new Options().addOption(Option.builder((String)"config").hasArg().required().desc("control center properties file").build()).addOption(Option.builder((String)"topic").hasArg().required().desc("topic to consume").build()).addOption(Option.builder((String)"from").hasArg().type(Number.class).desc("start consuming from this timestamp").build()).addOption(Option.builder((String)"to").hasArg().type(Number.class).desc("consume until this timestamp").build()).addOption(Option.builder((String)"outfile").hasArg().required().desc("file to dump output").build());
        try {
            cmd = new DefaultParser().parse(options, args);
        }
        catch (ParseException e) {
            HelpFormatter formatter = new HelpFormatter();
            System.out.println(e.getMessage());
            formatter.printUsage(new PrintWriter((Writer)new OutputStreamWriter((OutputStream)System.out, Charset.defaultCharset()), true), Integer.MAX_VALUE, "", options);
            return;
        }
        Properties config = new Properties();
        config.put("bootstrap.servers", "localhost:9092");
        config.putAll((Map<?, ?>)AbstractConfig.getPropsFromFile((String)cmd.getOptionValue("config")));
        config.put("group.id", GROUP_ID);
        config.put("client.id", CLIENT_ID);
        config.put("enable.auto.commit", "false");
        config.put("auto.offset.reset", "earliest");
        long fromTs = cmd.hasOption("from") ? (Long)cmd.getParsedOptionValue("from") : 0L;
        long toTs = cmd.hasOption("to") ? (Long)cmd.getParsedOptionValue("to") : Long.MAX_VALUE;
        String topic = cmd.getOptionValue("topic");
        String outfile = cmd.getOptionValue("outfile");
        UberFormatter formatter = new UberFormatter();
        formatter.init(cmd.getOptionProperties("property"));
        long lastTimestamp = -1L;
        long consumedCount = 0L;
        try (KafkaConsumer consumer = new KafkaConsumer(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
             PrintStream ps = new PrintStream(outfile, Charsets.UTF_8.name());){
            Set<TopicPartition> allPartitions = KafkaHelper.partitionsForTopic(consumer, topic);
            Map<TopicPartition, OffsetAndMetadata> newOffsets = DataExporter.offsetsForTimestamp(consumer, allPartitions, fromTs);
            if (newOffsets.size() > 0) {
                log.debug("seeking to new offsets");
                consumer.commitSync(newOffsets);
            }
            long lastStatusUpdate = 0L;
            while (true) {
                ConsumerRecords records = consumer.poll(1000L);
                log.debug("polled {} records", (Object)records.count());
                if (records.isEmpty()) {
                    break;
                }
                for (ConsumerRecord record : records) {
                    if (record.timestamp() < fromTs || record.timestamp() > toTs) continue;
                    formatter.writeTo((ConsumerRecord<byte[], byte[]>)record, ps);
                    lastTimestamp = record.timestamp();
                    ++consumedCount;
                }
                consumer.commitSync();
                long now = System.currentTimeMillis();
                if (now - lastStatusUpdate <= 5000L) continue;
                lastStatusUpdate = now;
                log.info("exported {} records so far, last exported record time={}", (Object)consumedCount, (Object)new DateTime(lastTimestamp).toString());
            }
        }
        catch (Exception e) {
            try {
                log.error("failed exporting from topic={} after {} records and last_timestamp={}", new Object[]{topic, consumedCount, lastTimestamp, e});
            }
            catch (Throwable throwable) {
                log.info("exported a total of {} records, last_timestamp={}, output_file={}", new Object[]{consumedCount, lastTimestamp, outfile});
                log.info("shutting down");
                throw throwable;
            }
            log.info("exported a total of {} records, last_timestamp={}, output_file={}", new Object[]{consumedCount, lastTimestamp, outfile});
            log.info("shutting down");
        }
        log.info("exported a total of {} records, last_timestamp={}, output_file={}", new Object[]{consumedCount, lastTimestamp, outfile});
        log.info("shutting down");
    }

    protected static <K, V> Map<TopicPartition, OffsetAndMetadata> offsetsForTimestamp(KafkaConsumer<K, V> consumer, Set<TopicPartition> topicPartitions, long timestamp) {
        HashMap<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        if (topicPartitions == null || topicPartitions.isEmpty()) {
            return newOffsets;
        }
        consumer.assign(topicPartitions);
        HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : topicPartitions) {
            timestampsToSearch.put(tp, timestamp);
        }
        Map offsetsAndTimestamps = consumer.offsetsForTimes(timestampsToSearch);
        for (TopicPartition tp : topicPartitions) {
            if (!offsetsAndTimestamps.containsKey(tp)) continue;
            long newOffset = ((OffsetAndTimestamp)offsetsAndTimestamps.get(tp)).offset();
            newOffsets.put(tp, new OffsetAndMetadata(newOffset));
            log.info("new offset for tp={} offset={}", (Object)tp, (Object)newOffset);
        }
        return newOffsets;
    }
}

