/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VerifiableConsumer
implements Closeable,
OffsetCommitCallback,
ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(VerifiableConsumer.class);
    private final ObjectMapper mapper = new ObjectMapper();
    private final PrintStream out;
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private final boolean useAutoCommit;
    private final boolean useAsyncCommit;
    private final boolean verbose;
    private final int maxMessages;
    private int consumedMessages = 0;
    private final boolean sendOffsetForTimesData;
    private HashMap<TopicPartition, Long> partitionToLastTimestamp;
    private CountDownLatch shutdownLatch = new CountDownLatch(1);

    public VerifiableConsumer(KafkaConsumer<String, String> consumer, PrintStream out, String topic, int maxMessages, boolean useAutoCommit, boolean useAsyncCommit, boolean verbose, boolean sendOffsetForTimesData) {
        this.consumer = consumer;
        this.out = out;
        this.topic = topic;
        this.maxMessages = maxMessages;
        this.useAutoCommit = useAutoCommit;
        this.useAsyncCommit = useAsyncCommit;
        this.verbose = verbose;
        this.sendOffsetForTimesData = sendOffsetForTimesData;
        this.addKafkaSerializerModule();
        this.partitionToLastTimestamp = new HashMap();
    }

    private void addKafkaSerializerModule() {
        SimpleModule kafka = new SimpleModule();
        kafka.addSerializer(TopicPartition.class, (JsonSerializer)new JsonSerializer<TopicPartition>(){

            public void serialize(TopicPartition tp, JsonGenerator gen, SerializerProvider serializers) throws IOException {
                gen.writeStartObject();
                gen.writeObjectField("topic", (Object)tp.topic());
                gen.writeObjectField("partition", (Object)tp.partition());
                gen.writeEndObject();
            }
        });
        this.mapper.registerModule((Module)kafka);
    }

    private boolean hasMessageLimit() {
        return this.maxMessages >= 0;
    }

    private boolean isFinished() {
        return this.hasMessageLimit() && this.consumedMessages >= this.maxMessages;
    }

    private Map<TopicPartition, OffsetAndMetadata> onRecordsReceived(ConsumerRecords<String, String> records) {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        ArrayList<RecordSetSummary> summaries = new ArrayList<RecordSetSummary>();
        for (TopicPartition tp : records.partitions()) {
            List partitionRecords = records.records(tp);
            if (this.hasMessageLimit() && this.consumedMessages + partitionRecords.size() > this.maxMessages) {
                partitionRecords = partitionRecords.subList(0, this.maxMessages - this.consumedMessages);
            }
            if (partitionRecords.isEmpty()) continue;
            long minOffset = ((ConsumerRecord)partitionRecords.get(0)).offset();
            long maxOffset = ((ConsumerRecord)partitionRecords.get(partitionRecords.size() - 1)).offset();
            offsets.put(tp, new OffsetAndMetadata(maxOffset + 1L));
            summaries.add(new RecordSetSummary(tp.topic(), tp.partition(), partitionRecords.size(), minOffset, maxOffset));
            if (this.verbose) {
                for (ConsumerRecord record : partitionRecords) {
                    this.printJson(new RecordData((ConsumerRecord<String, String>)record));
                }
            }
            this.consumedMessages += partitionRecords.size();
            if (!this.isFinished()) continue;
            break;
        }
        this.printJson(new RecordsConsumed(records.count(), summaries));
        return offsets;
    }

    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        ArrayList<CommitData> committedOffsets = new ArrayList<CommitData>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
            TopicPartition tp = offsetEntry.getKey();
            committedOffsets.add(new CommitData(tp.topic(), tp.partition(), offsetEntry.getValue().offset()));
        }
        boolean success = true;
        String error = null;
        if (exception != null) {
            success = false;
            error = exception.getMessage();
        }
        this.printJson(new OffsetsCommitted(committedOffsets, error, success));
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.printJson(new PartitionsAssigned(partitions));
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.printJson(new PartitionsRevoked(partitions));
    }

    private void printJson(Object data) {
        try {
            this.out.println(this.mapper.writeValueAsString(data));
        }
        catch (JsonProcessingException e) {
            this.out.println("Bad data can't be written as json: " + e.getMessage());
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        try {
            this.consumer.commitSync(offsets);
            this.onComplete(offsets, null);
        }
        catch (WakeupException e) {
            this.commitSync(offsets);
            throw e;
        }
        catch (FencedInstanceIdException e) {
            throw e;
        }
        catch (Exception e) {
            this.onComplete(offsets, e);
        }
    }

    private void streamOffsetForTimesData(ConsumerRecords<String, String> records) {
        HashMap<TopicPartition, Long> partitionToTimeStamp = new HashMap<TopicPartition, Long>();
        block0: for (TopicPartition tp : this.consumer.assignment()) {
            List recordsForCurrentTP = records.records(tp);
            for (ConsumerRecord record : recordsForCurrentTP) {
                if (this.partitionToLastTimestamp.containsKey(tp) && record.timestamp() <= this.partitionToLastTimestamp.get(tp)) continue;
                partitionToTimeStamp.put(tp, record.timestamp());
                this.partitionToLastTimestamp.put(tp, record.timestamp());
                continue block0;
            }
        }
        Map partitionToOffsetAndTimestamp = this.consumer.offsetsForTimes(partitionToTimeStamp);
        for (Map.Entry entry : partitionToOffsetAndTimestamp.entrySet()) {
            this.printJson(new OffsetForTimesData((TopicPartition)entry.getKey(), ((OffsetAndTimestamp)entry.getValue()).timestamp(), ((OffsetAndTimestamp)entry.getValue()).offset()));
        }
    }

    public void run() {
        try {
            this.printJson(new StartupComplete());
            this.consumer.subscribe(Collections.singletonList(this.topic), (ConsumerRebalanceListener)this);
            while (!this.isFinished()) {
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                Map<TopicPartition, OffsetAndMetadata> offsets = this.onRecordsReceived((ConsumerRecords<String, String>)records);
                if (this.sendOffsetForTimesData) {
                    this.streamOffsetForTimesData((ConsumerRecords<String, String>)records);
                }
                if (this.useAutoCommit) continue;
                if (this.useAsyncCommit) {
                    this.consumer.commitAsync(offsets, (OffsetCommitCallback)this);
                    continue;
                }
                this.commitSync(offsets);
            }
        }
        catch (WakeupException e) {
            log.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.", (Throwable)e);
        }
        catch (Throwable t) {
            log.error("Error during processing, terminating consumer process: ", t);
        }
        finally {
            this.consumer.close();
            this.printJson(new ShutdownComplete());
            this.shutdownLatch.countDown();
        }
    }

    @Override
    public void close() {
        boolean interrupted = false;
        try {
            this.consumer.wakeup();
            while (true) {
                try {
                    this.shutdownLatch.await();
                    return;
                }
                catch (InterruptedException e) {
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)"verifiable-consumer").defaultHelp(true).description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
        MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group").description("Group of arguments for connection to brokers").required(true);
        connectionGroup.addArgument(new String[]{"--bootstrap-server"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"HOST1:PORT1[,HOST2:PORT2[...]]"}).dest("bootstrapServer").help("REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        connectionGroup.addArgument(new String[]{"--broker-list"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"HOST1:PORT1[,HOST2:PORT2[...]]"}).dest("brokerList").help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        parser.addArgument(new String[]{"--topic"}).action((ArgumentAction)Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).help("Consumes messages from this topic.");
        parser.addArgument(new String[]{"--group-id"}).action((ArgumentAction)Arguments.store()).required(true).type(String.class).metavar(new String[]{"GROUP_ID"}).dest("groupId").help("The groupId shared among members of the consumer group");
        parser.addArgument(new String[]{"--group-instance-id"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"GROUP_INSTANCE_ID"}).dest("groupInstanceId").help("A unique identifier of the consumer instance");
        parser.addArgument(new String[]{"--max-messages"}).action((ArgumentAction)Arguments.store()).required(false).type(Integer.class).setDefault((Object)-1).metavar(new String[]{"MAX-MESSAGES"}).dest("maxMessages").help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally");
        parser.addArgument(new String[]{"--session-timeout"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)30000).type(Integer.class).metavar(new String[]{"TIMEOUT_MS"}).dest("sessionTimeout").help("Set the consumer's session timeout");
        parser.addArgument(new String[]{"--verbose"}).action((ArgumentAction)Arguments.storeTrue()).type(Boolean.class).metavar(new String[]{"VERBOSE"}).help("Enable to log individual consumed records");
        parser.addArgument(new String[]{"--enable-autocommit"}).action((ArgumentAction)Arguments.storeTrue()).type(Boolean.class).metavar(new String[]{"ENABLE-AUTOCOMMIT"}).dest("useAutoCommit").help("Enable offset auto-commit on consumer");
        parser.addArgument(new String[]{"--send-offset-for-times-data"}).action((ArgumentAction)Arguments.storeTrue()).type(Boolean.class).metavar(new String[]{"SEND-OFFSET-FOR-TIMES-DATA"}).dest("sendOffsetForTimesData").help("Consumer sends offsetForTimes() information for all the partitions it has subscribed to. Use when version = DEV_BRANCH");
        parser.addArgument(new String[]{"--reset-policy"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)"earliest").type(String.class).dest("resetPolicy").help("Set reset policy (must be either 'earliest', 'latest', or 'none'");
        parser.addArgument(new String[]{"--assignment-strategy"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)RangeAssignor.class.getName()).type(String.class).dest("assignmentStrategy").help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")");
        parser.addArgument(new String[]{"--consumer.config"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"CONFIG_FILE"}).help("Consumer config properties file (config options shared with command line parameters will be overridden).");
        return parser;
    }

    public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
        Namespace res = parser.parseArgs(args);
        boolean useAutoCommit = res.getBoolean("useAutoCommit");
        boolean sendOffsetForTimesData = res.getBoolean("sendOffsetForTimesData");
        String configFile = res.getString("consumer.config");
        String brokerHostandPort = null;
        Properties consumerProps = new Properties();
        if (configFile != null) {
            try {
                consumerProps.putAll((Map<?, ?>)Utils.loadProps((String)configFile));
            }
            catch (IOException e) {
                throw new ArgumentParserException(e.getMessage(), parser);
            }
        }
        consumerProps.put("group.id", res.getString("groupId"));
        String groupInstanceId = res.getString("groupInstanceId");
        if (groupInstanceId != null) {
            consumerProps.put("group.instance.id", groupInstanceId);
        }
        if (res.get("bootstrapServer") != null) {
            brokerHostandPort = res.getString("bootstrapServer");
        } else if (res.getString("brokerList") != null) {
            brokerHostandPort = res.getString("brokerList");
        } else {
            parser.printHelp();
            System.exit(0);
        }
        consumerProps.put("bootstrap.servers", brokerHostandPort);
        consumerProps.put("enable.auto.commit", (Object)useAutoCommit);
        consumerProps.put("auto.offset.reset", res.getString("resetPolicy"));
        consumerProps.put("session.timeout.ms", Integer.toString(res.getInt("sessionTimeout")));
        consumerProps.put("partition.assignment.strategy", res.getString("assignmentStrategy"));
        StringDeserializer deserializer = new StringDeserializer();
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)deserializer, (Deserializer)deserializer);
        String topic = res.getString("topic");
        int maxMessages = res.getInt("maxMessages");
        boolean verbose = res.getBoolean("verbose");
        return new VerifiableConsumer((KafkaConsumer<String, String>)consumer, System.out, topic, maxMessages, useAutoCommit, false, verbose, sendOffsetForTimesData);
    }

    public static void main(String[] args) {
        ArgumentParser parser = VerifiableConsumer.argParser();
        if (args.length == 0) {
            parser.printHelp();
            System.exit(0);
        }
        try {
            VerifiableConsumer consumer = VerifiableConsumer.createFromArgs(parser, args);
            Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook"));
            consumer.run();
        }
        catch (ArgumentParserException e) {
            parser.handleError(e);
            System.exit(1);
        }
    }

    private static class RecordSetSummary
    extends PartitionData {
        private final long count;
        private final long minOffset;
        private final long maxOffset;

        public RecordSetSummary(String topic, int partition, long count, long minOffset, long maxOffset) {
            super(topic, partition);
            this.count = count;
            this.minOffset = minOffset;
            this.maxOffset = maxOffset;
        }

        @JsonProperty
        public long count() {
            return this.count;
        }

        @JsonProperty
        public long minOffset() {
            return this.minOffset;
        }

        @JsonProperty
        public long maxOffset() {
            return this.maxOffset;
        }
    }

    private static class CommitData
    extends PartitionData {
        private final long offset;

        public CommitData(String topic, int partition, long offset) {
            super(topic, partition);
            this.offset = offset;
        }

        @JsonProperty
        public long offset() {
            return this.offset;
        }
    }

    private static class OffsetsCommitted
    extends ConsumerEvent {
        private final List<CommitData> offsets;
        private final String error;
        private final boolean success;

        public OffsetsCommitted(List<CommitData> offsets, String error, boolean success) {
            this.offsets = offsets;
            this.error = error;
            this.success = success;
        }

        @Override
        public String name() {
            return "offsets_committed";
        }

        @JsonProperty
        public List<CommitData> offsets() {
            return this.offsets;
        }

        @JsonProperty
        @JsonInclude(value=JsonInclude.Include.NON_NULL)
        public String error() {
            return this.error;
        }

        @JsonProperty
        public boolean success() {
            return this.success;
        }
    }

    private static class PartitionData {
        private final String topic;
        private final int partition;

        public PartitionData(String topic, int partition) {
            this.topic = topic;
            this.partition = partition;
        }

        @JsonProperty
        public String topic() {
            return this.topic;
        }

        @JsonProperty
        public int partition() {
            return this.partition;
        }
    }

    public static class OffsetForTimesData
    extends ConsumerEvent {
        private final long timestamp;
        private final long offset;
        private final TopicPartition partition;

        public OffsetForTimesData(TopicPartition partition, long timestamp, long offset) {
            this.partition = partition;
            this.timestamp = timestamp;
            this.offset = offset;
        }

        @Override
        public String name() {
            return "offset_for_times_data";
        }

        @JsonProperty
        public TopicPartition partition() {
            return this.partition;
        }

        @Override
        @JsonProperty
        public long timestamp() {
            return this.timestamp;
        }

        @JsonProperty
        public long offset() {
            return this.offset;
        }
    }

    @JsonPropertyOrder(value={"recordts", "timestamp", "name", "key", "value", "topic", "partition", "offset"})
    public static class RecordData
    extends ConsumerEvent {
        private final ConsumerRecord<String, String> record;

        public RecordData(ConsumerRecord<String, String> record) {
            this.record = record;
        }

        @Override
        public String name() {
            return "record_data";
        }

        @JsonProperty
        public String topic() {
            return this.record.topic();
        }

        @JsonProperty
        public int partition() {
            return this.record.partition();
        }

        @JsonProperty
        public String key() {
            return (String)this.record.key();
        }

        @JsonProperty
        public String value() {
            return (String)this.record.value();
        }

        @JsonProperty
        public long offset() {
            return this.record.offset();
        }

        @JsonProperty
        public long recordts() {
            return this.record.timestamp();
        }
    }

    public static class RecordsConsumed
    extends ConsumerEvent {
        private final long count;
        private final List<RecordSetSummary> partitionSummaries;

        public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) {
            this.count = count;
            this.partitionSummaries = partitionSummaries;
        }

        @Override
        public String name() {
            return "records_consumed";
        }

        @JsonProperty
        public long count() {
            return this.count;
        }

        @JsonProperty
        public List<RecordSetSummary> partitions() {
            return this.partitionSummaries;
        }
    }

    private static class PartitionsAssigned
    extends ConsumerEvent {
        private final Collection<TopicPartition> partitions;

        public PartitionsAssigned(Collection<TopicPartition> partitions) {
            this.partitions = partitions;
        }

        @JsonProperty
        public Collection<TopicPartition> partitions() {
            return this.partitions;
        }

        @Override
        public String name() {
            return "partitions_assigned";
        }
    }

    private static class PartitionsRevoked
    extends ConsumerEvent {
        private final Collection<TopicPartition> partitions;

        public PartitionsRevoked(Collection<TopicPartition> partitions) {
            this.partitions = partitions;
        }

        @JsonProperty
        public Collection<TopicPartition> partitions() {
            return this.partitions;
        }

        @Override
        public String name() {
            return "partitions_revoked";
        }
    }

    private static class ShutdownComplete
    extends ConsumerEvent {
        private ShutdownComplete() {
        }

        @Override
        public String name() {
            return "shutdown_complete";
        }
    }

    private static class StartupComplete
    extends ConsumerEvent {
        private StartupComplete() {
        }

        @Override
        public String name() {
            return "startup_complete";
        }
    }

    @JsonPropertyOrder(value={"timestamp", "name"})
    private static abstract class ConsumerEvent {
        private final long timestamp = System.currentTimeMillis();

        private ConsumerEvent() {
        }

        @JsonProperty
        public abstract String name();

        @JsonProperty
        public long timestamp() {
            return this.timestamp;
        }
    }
}

