/*
 * Decompiled with CFR 0.152.
 */
package kafka.shell;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.log.Defaults;
import kafka.raft.KafkaMetadataLog;
import kafka.raft.MetadataLogConfig;
import kafka.server.KafkaRaftServer;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.util.ClusterMetadataSource;
import org.apache.kafka.metadata.util.LocalMetadataLogReader;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.shell.NonInteractiveShell;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=120L)
@Tag(value="integration")
public class MetadataShellIntegrationTest {
    static void assertLinesMatch(List<String> expected, List<String> actual) {
        for (int i = 0; i < Math.max(expected.size(), actual.size()); ++i) {
            boolean matched;
            String expectedLine = i < expected.size() ? expected.get(i) : "";
            String actualLine = i < actual.size() ? actual.get(i) : "";
            boolean bl = matched = expectedLine.startsWith("^") ? Pattern.matches(expectedLine, actualLine) : expectedLine.equals(actualLine);
            if (matched) continue;
            throw new RuntimeException("Mismatch on line " + i + ": Expected: " + expectedLine + ", but got: " + actualLine + ". FULL OUTPUT: " + String.join((CharSequence)"\n", actual));
        }
    }

    static void assertCommandOutput(NonInteractiveShell shell, List<String> args, String ... expected) throws Exception {
        try (CaptureStream captureStream = new CaptureStream();){
            shell.run((OutputStream)captureStream.out, args);
            MetadataShellIntegrationTest.assertLinesMatch(Arrays.asList(expected), captureStream.outputLines());
        }
    }

    private static String getFirstMetadataLogFile(KafkaClusterTestKit cluster) throws Exception {
        String metadataDir = cluster.nodes().controllerNodes().values().iterator().next().metadataDirectory() + File.separator + KafkaRaftServer.MetadataTopic() + "-0";
        ArrayList<Path> paths = new ArrayList<Path>();
        for (Path path : Files.newDirectoryStream(Paths.get(metadataDir, new String[0]), p -> p.getFileName().toString().endsWith(".log"))) {
            paths.add(path);
        }
        Assertions.assertEquals((int)1, (int)paths.size(), (String)"Expected exactly one .log file");
        return metadataDir;
    }

    private static KafkaClusterTestKit createCluster1() throws Exception {
        KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumBrokerNodes(3).setNumControllerNodes(1).build()).build();
        try {
            cluster.format();
            cluster.startup();
            try (Admin admin = Admin.create((Properties)cluster.clientProperties());){
                ArrayList<NewTopic> newTopics = new ArrayList<NewTopic>();
                HashMap<Integer, List<Integer>> fooPartitions = new HashMap<Integer, List<Integer>>();
                fooPartitions.put(0, Arrays.asList(0, 1, 2));
                fooPartitions.put(1, Arrays.asList(1, 2, 0));
                newTopics.add(new NewTopic("foo", fooPartitions));
                HashMap<Integer, List<Integer>> barPartitions = new HashMap<Integer, List<Integer>>();
                barPartitions.put(0, Arrays.asList(2, 0, 1));
                newTopics.add(new NewTopic("bar", barPartitions));
                admin.createTopics(newTopics).all().get();
            }
            cluster.shutdownAll();
            return cluster;
        }
        catch (Throwable e) {
            cluster.close();
            throw e;
        }
    }

    @Test
    void testLoadSnapshotAndList() throws Exception {
        try (KafkaClusterTestKit cluster = MetadataShellIntegrationTest.createCluster1();){
            TopicPartition topicPartitionArg = Topic.METADATA_TOPIC_PARTITION;
            Uuid topicArg = Uuid.METADATA_TOPIC_ID;
            File datadirArg = new File(MetadataShellIntegrationTest.getFirstMetadataLogFile(cluster));
            SystemTime timeArg = new SystemTime();
            Metrics metricsArg = new Metrics();
            KafkaScheduler schedulerArg = new KafkaScheduler(1, "scheduler", true, false);
            MetadataLogConfig configArg = new MetadataLogConfig(102400, 102400, 10000L, Long.MAX_VALUE, Long.MAX_VALUE, 0x800000, 0x800000, Defaults.FileDeleteDelayMs(), 1);
            KafkaMetadataLog metadataLog = KafkaMetadataLog.createWithoutRecovery((TopicPartition)topicPartitionArg, (Uuid)topicArg, (File)datadirArg, (Time)timeArg, (Metrics)metricsArg, (Scheduler)schedulerArg, (MetadataLogConfig)configArg);
            LocalMetadataLogReader reader = new LocalMetadataLogReader((ReplicatedLog)metadataLog, OptionalLong.of(Long.MAX_VALUE));
            try (NonInteractiveShell shell = new NonInteractiveShell((ClusterMetadataSource)reader);){
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls"), "acls", "brokers", "configs", "metadataQuorum", "producerIds", "topicIds", "topics");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "brokers"), "0", "1", "2");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "topics/foo"), "0", "1", "id", "name");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "topics/bar"), "0", "id", "name");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "topics/baz"), "ls: topics/baz: no such file or directory.");
            }
        }
    }

    @Test
    void testLoadSnapshotAndTryPathCommands() throws Exception {
        try (KafkaClusterTestKit cluster = MetadataShellIntegrationTest.createCluster1();){
            TopicPartition topicPartitionArg = Topic.METADATA_TOPIC_PARTITION;
            Uuid topicArg = Uuid.METADATA_TOPIC_ID;
            File datadirArg = new File(MetadataShellIntegrationTest.getFirstMetadataLogFile(cluster));
            SystemTime timeArg = new SystemTime();
            Metrics metricsArg = new Metrics();
            KafkaScheduler schedulerArg = new KafkaScheduler(1, "scheduler", true, false);
            MetadataLogConfig configArg = new MetadataLogConfig(102400, 102400, 10000L, Long.MAX_VALUE, Long.MAX_VALUE, 0x800000, 0x800000, Defaults.FileDeleteDelayMs(), 1);
            KafkaMetadataLog metadataLog = KafkaMetadataLog.createWithoutRecovery((TopicPartition)topicPartitionArg, (Uuid)topicArg, (File)datadirArg, (Time)timeArg, (Metrics)metricsArg, (Scheduler)schedulerArg, (MetadataLogConfig)configArg);
            LocalMetadataLogReader reader = new LocalMetadataLogReader((ReplicatedLog)metadataLog, OptionalLong.of(Long.MAX_VALUE));
            try (NonInteractiveShell shell = new NonInteractiveShell((ClusterMetadataSource)reader);){
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("cd", "brokers"), "");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("pwd"), "/brokers");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls"), "0", "1", "2");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "."), "0", "1", "2");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("man", "pwd"), "pwd: Print the current working directory.", "", "usage: pwd");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("cd"), new String[0]);
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("pwd"), "/");
            }
        }
    }

    static class CaptureStream
    implements AutoCloseable {
        private final ByteArrayOutputStream out = new ByteArrayOutputStream();
        private final PrintStream stream = new PrintStream((OutputStream)this.out, true, "utf8");

        CaptureStream() throws Exception {
        }

        @Override
        public void close() throws Exception {
            Utils.closeQuietly((AutoCloseable)this.stream, (String)"stream");
            Utils.closeQuietly((AutoCloseable)this.out, (String)"out");
        }

        List<String> outputLines() throws IOException {
            this.out.flush();
            return Arrays.asList(this.out.toString().split("\n"));
        }
    }
}

