/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry;

import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.storage.Mode;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class LeaderElectorTest
extends ClusterTestHarness {
    private String zkConnect() {
        return null;
    }

    private String bootstrapServers() {
        return this.brokerList;
    }

    @Test
    public void testAutoFailover() throws Exception {
        String subject = "testTopic";
        String configSubject = "configTopic";
        List<String> avroSchemas = TestUtils.getRandomCanonicalAvroString(4);
        int port1 = this.choosePort();
        int port2 = this.choosePort();
        if (port2 < port1) {
            int tmp = port2;
            port2 = port1;
            port1 = tmp;
        }
        RestApp restApp1 = new RestApp(port1, this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, true, null);
        restApp1.start();
        final RestApp restApp2 = new RestApp(port2, this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, true, null);
        restApp2.start();
        Assertions.assertTrue((boolean)restApp1.isLeader(), (String)"Schema registry instance 1 should be the leader");
        Assertions.assertFalse((boolean)restApp2.isLeader(), (String)"Schema registry instance 2 shouldn't be the leader");
        Assertions.assertEquals((Object)restApp1.myIdentity(), (Object)restApp2.leaderIdentity(), (String)"Instance 2's leader should be instance 1");
        String firstSchema = avroSchemas.get(0);
        boolean firstSchemaExpectedId = true;
        TestUtils.registerAndVerifySchema(restApp1.restClient, firstSchema, 1, "testTopic");
        this.verifyIdAndSchema(restApp2.restClient, 1, firstSchema, "Registered schema should be found on the non-leader");
        String secondSchema = avroSchemas.get(1);
        int secondSchemaExpectedId = 2;
        int secondSchemaExpectedVersion = 2;
        Assertions.assertEquals((int)2, (int)restApp2.restClient.registerSchema(secondSchema, "testTopic"), (String)"Registering a new schema to the non-leader should succeed");
        Assertions.assertEquals((Object)secondSchema, (Object)restApp1.restClient.getId(2).getSchemaString(), (String)"Registered schema should be found on the leader");
        Assertions.assertEquals((Object)secondSchema, (Object)restApp1.restClient.getVersion("testTopic", 2).getSchema(), (String)"Registered schema should be found on the leader");
        this.verifyIdAndSchema(restApp2.restClient, 2, secondSchema, "Registered schema should be found on the non-leader");
        Assertions.assertEquals((int)2, (int)restApp1.restClient.registerSchema(secondSchema, "testTopic"), (String)"Registering an existing schema to the leader should return its id");
        Assertions.assertEquals((int)2, (int)restApp2.restClient.registerSchema(secondSchema, "testTopic"), (String)"Registering an existing schema to the non-leader should return its id");
        restApp1.restClient.updateCompatibility(CompatibilityLevel.FORWARD.name, "configTopic");
        Assertions.assertEquals((Object)CompatibilityLevel.FORWARD.name, (Object)restApp1.restClient.getConfig("configTopic").getCompatibilityLevel(), (String)"New compatibility level should be FORWARD on the leader");
        this.waitUntilCompatibilityLevelSet(restApp2.restClient, "configTopic", CompatibilityLevel.FORWARD.name, "New compatibility level should be FORWARD on the non-leader");
        restApp2.restClient.updateCompatibility(CompatibilityLevel.NONE.name, "configTopic");
        Assertions.assertEquals((Object)CompatibilityLevel.NONE.name, (Object)restApp1.restClient.getConfig("configTopic").getCompatibilityLevel(), (String)"New compatibility level should be NONE on the leader");
        this.waitUntilCompatibilityLevelSet(restApp2.restClient, "configTopic", CompatibilityLevel.NONE.name, "New compatibility level should be NONE on the non-leader");
        restApp1.setLeader(null);
        int statusCodeFromRestApp1 = 0;
        String failedSchema = "{\"type\":\"string\"}";
        try {
            restApp1.restClient.registerSchema("{\"type\":\"string\"}", "testTopic");
            Assertions.fail((String)"Registration should fail on the leader");
        }
        catch (RestClientException e) {
            statusCodeFromRestApp1 = e.getStatus();
        }
        int statusCodeFromRestApp2 = 0;
        try {
            restApp2.restClient.registerSchema("{\"type\":\"string\"}", "testTopic");
            Assertions.fail((String)"Registration should fail on the non-leader");
        }
        catch (RestClientException e) {
            statusCodeFromRestApp2 = e.getStatus();
        }
        Assertions.assertEquals((int)500, (int)statusCodeFromRestApp1, (String)"Status code from a non-leader rest app for register schema should be 500");
        Assertions.assertEquals((int)statusCodeFromRestApp1, (int)statusCodeFromRestApp2, (String)"Error code from the leader and the non-leader should be the same");
        int updateConfigStatusCodeFromRestApp1 = 0;
        try {
            restApp1.restClient.updateCompatibility(CompatibilityLevel.FORWARD.name, "configTopic");
            Assertions.fail((String)"Update config should fail on the leader");
        }
        catch (RestClientException e) {
            updateConfigStatusCodeFromRestApp1 = e.getStatus();
        }
        int updateConfigStatusCodeFromRestApp2 = 0;
        try {
            restApp2.restClient.updateCompatibility(CompatibilityLevel.FORWARD.name, "configTopic");
            Assertions.fail((String)"Update config should fail on the non-leader");
        }
        catch (RestClientException e) {
            updateConfigStatusCodeFromRestApp2 = e.getStatus();
        }
        Assertions.assertEquals((int)500, (int)updateConfigStatusCodeFromRestApp1, (String)"Status code from a non-leader rest app for update config should be 500");
        Assertions.assertEquals((int)updateConfigStatusCodeFromRestApp1, (int)updateConfigStatusCodeFromRestApp2, (String)"Error code from the leader and the non-leader should be the same");
        Assertions.assertEquals((int)2, (int)restApp2.restClient.registerSchema(secondSchema, "testTopic"), (String)"Registering an existing schema to the non-leader should return its id");
        restApp1.setLeader(restApp1.myIdentity());
        String thirdSchema = avroSchemas.get(2);
        int thirdSchemaExpectedVersion = 3;
        int thirdSchemaExpectedId = 3;
        Assertions.assertEquals((int)3, (int)restApp1.restClient.registerSchema(thirdSchema, "testTopic"), (String)"Registering a new schema to the leader should succeed");
        restApp1.stop();
        Callable<Boolean> condition = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return restApp2.isLeader();
            }
        };
        TestUtils.waitUntilTrue(condition, 15000L, "Schema registry instance 2 should become the leader");
        Assertions.assertEquals((Object)thirdSchema, (Object)restApp2.restClient.getId(3).getSchemaString(), (String)"Latest version should be found on the new leader");
        Assertions.assertEquals((Object)thirdSchema, (Object)restApp2.restClient.getVersion("testTopic", 3).getSchema(), (String)"Latest version should be found on the new leader");
        String fourthSchema = avroSchemas.get(3);
        int fourthSchemaExpectedId = 4;
        TestUtils.registerAndVerifySchema(restApp2.restClient, fourthSchema, 4, "testTopic");
        restApp2.stop();
    }

    @Test
    public void testFollowerIsNeverLeader() throws Exception {
        int numFollowers = 2;
        int numLeaders = 30;
        HashSet<RestApp> followerApps = new HashSet<RestApp>();
        RestApp aFollower = null;
        for (int i = 0; i < numFollowers; ++i) {
            RestApp follower = new RestApp(this.choosePort(), this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, false, null);
            followerApps.add(follower);
            follower.start();
            aFollower = follower;
        }
        Assertions.assertNotNull(aFollower);
        for (RestApp follower : followerApps) {
            Assertions.assertFalse((boolean)follower.isLeader(), (String)"No follower should be leader.");
            Assertions.assertNull((Object)follower.leaderIdentity(), (String)"No follower should be present in a follower cluster.");
        }
        try {
            aFollower.setLeader(aFollower.myIdentity());
        }
        catch (IllegalStateException i) {
            // empty catch block
        }
        Assertions.assertFalse((boolean)aFollower.isLeader(), (String)"Should not be able to set a follower to be leader.");
        Assertions.assertNull((Object)aFollower.leaderIdentity(), (String)"There should be no leader present.");
        HashSet<RestApp> leaderApps = new HashSet<RestApp>();
        for (int i = 0; i < numLeaders; ++i) {
            RestApp leader = new RestApp(this.choosePort(), this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, true, null);
            leaderApps.add(leader);
            leader.start();
            this.waitUntilLeaderElectionCompletes(leaderApps);
        }
        while (leaderApps.size() > 0) {
            RestApp reportedLeader = LeaderElectorTest.checkOneLeader(leaderApps);
            leaderApps.remove(reportedLeader);
            LeaderElectorTest.checkLeaderIdentity(followerApps, reportedLeader.myIdentity());
            LeaderElectorTest.checkLeaderIdentity(leaderApps, reportedLeader.myIdentity());
            LeaderElectorTest.checkNoneIsLeader(followerApps);
            reportedLeader.stop();
            this.waitUntilLeaderElectionCompletes(leaderApps);
        }
        LeaderElectorTest.checkNoneIsLeader(followerApps);
        LeaderElectorTest.checkNoneIsLeader(leaderApps);
        for (RestApp follower : followerApps) {
            follower.stop();
        }
    }

    @Test
    public void testRegistrationOnLeaderFollowerClusters() throws Exception {
        int id;
        Object leader;
        int numFollowers = 4;
        int numLeaders = 4;
        int numSchemas = 5;
        String subject = "testSubject";
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(numSchemas);
        ArrayList<Integer> ids = new ArrayList<Integer>();
        HashSet<RestApp> followerApps = new HashSet<RestApp>();
        RestApp aFollower = null;
        for (int i = 0; i < numFollowers; ++i) {
            RestApp follower = new RestApp(this.choosePort(), this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, false, null);
            followerApps.add(follower);
            follower.start();
            aFollower = follower;
        }
        Assertions.assertNotNull(aFollower);
        boolean successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(schemas.get(0), subject);
            successfullyRegistered = true;
        }
        catch (RestClientException follower) {
            // empty catch block
        }
        Assertions.assertFalse((boolean)successfullyRegistered, (String)"Should not be possible to register with no leaders present.");
        HashSet<RestApp> leaderApps = new HashSet<RestApp>();
        RestApp aLeader = null;
        for (int i = 0; i < numLeaders; ++i) {
            leader = new RestApp(this.choosePort(), this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, true, null);
            leaderApps.add((RestApp)leader);
            ((RestApp)leader).start();
            aLeader = leader;
        }
        Assertions.assertNotNull(aLeader);
        try {
            for (String schema : schemas) {
                ids.add(aLeader.restClient.registerSchema(schema, subject));
            }
        }
        catch (RestClientException e) {
            Assertions.fail((String)"It should be possible to register schemas when a leader cluster is present.");
        }
        String anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        try {
            ids.add(aFollower.restClient.registerSchema(anotherSchema, subject));
        }
        catch (RestClientException e) {
            Assertions.fail((String)"Should be possible register a schema through follower cluster.");
        }
        try {
            Iterator e = ids.iterator();
            while (e.hasNext()) {
                id = (Integer)e.next();
                this.waitUntilIdExists(aFollower.restClient, id, String.format("Should be possible to fetch id %d from this follower.", id));
                this.waitUntilIdExists(aLeader.restClient, id, String.format("Should be possible to fetch id %d from this leader.", id));
                SchemaString followerResponse = aFollower.restClient.getId(id);
                SchemaString leaderResponse = aLeader.restClient.getId(id);
                Assertions.assertEquals((Object)followerResponse.getSchemaString(), (Object)leaderResponse.getSchemaString(), (String)"Leader and follower responded with different schemas when queried with the same id.");
            }
        }
        catch (RestClientException e) {
            Assertions.fail((String)"Expected ids were not found in the schema registry.");
        }
        while (leaderApps.size() > 0) {
            leader = LeaderElectorTest.findLeader(leaderApps);
            leaderApps.remove(leader);
            ((RestApp)leader).stop();
            this.waitUntilLeaderElectionCompletes(leaderApps);
        }
        anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(anotherSchema, subject);
            successfullyRegistered = true;
        }
        catch (RestClientException leader2) {
            // empty catch block
        }
        Assertions.assertFalse((boolean)successfullyRegistered, (String)"Should not be possible to register with no leaders present.");
        try {
            leader = ids.iterator();
            while (leader.hasNext()) {
                id = (Integer)leader.next();
                SchemaString schemaString = aFollower.restClient.getId(id);
            }
            List versions = aFollower.restClient.getAllVersions(subject);
            Assertions.assertEquals((int)ids.size(), (int)versions.size(), (String)"Number of ids should match number of versions.");
        }
        catch (RestClientException e) {
            Assertions.fail((String)"Should be possible to fetch registered schemas even with no leaders present.");
        }
        for (RestApp follower : followerApps) {
            follower.stop();
        }
    }

    @Test
    public void testImportOnLeaderFollowerClusters() throws Exception {
        int id;
        Object leader;
        int numFollowers = 4;
        int numLeaders = 4;
        int numSchemas = 5;
        String subject = "testSubject";
        List<String> schemas = TestUtils.getRandomCanonicalAvroString(numSchemas);
        ArrayList<Integer> ids = new ArrayList<Integer>();
        Properties props = new Properties();
        props.setProperty("mode.mutability", "true");
        int newId = 100000;
        int newVersion = 100;
        HashSet<RestApp> followerApps = new HashSet<RestApp>();
        RestApp aFollower = null;
        for (int i = 0; i < numFollowers; ++i) {
            RestApp follower = new RestApp(this.choosePort(), this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, false, props);
            followerApps.add(follower);
            follower.start();
            aFollower = follower;
        }
        Assertions.assertNotNull(aFollower);
        boolean successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(schemas.get(0), subject, newVersion++, newId++);
            successfullyRegistered = true;
        }
        catch (RestClientException follower) {
            // empty catch block
        }
        Assertions.assertFalse((boolean)successfullyRegistered, (String)"Should not be possible to register with no leaders present.");
        HashSet<RestApp> leaderApps = new HashSet<RestApp>();
        RestApp aLeader = null;
        for (int i = 0; i < numLeaders; ++i) {
            leader = new RestApp(this.choosePort(), this.zkConnect(), this.bootstrapServers(), "_schemas", CompatibilityLevel.NONE.name, true, props);
            leaderApps.add((RestApp)leader);
            ((RestApp)leader).start();
            aLeader = leader;
        }
        Assertions.assertNotNull(aLeader);
        try {
            aLeader.restClient.setMode(Mode.IMPORT.toString());
        }
        catch (RestClientException e) {
            Assertions.fail((String)"It should be possible to set mode when a leader cluster is present.");
        }
        try {
            for (String schema : schemas) {
                ids.add(aLeader.restClient.registerSchema(schema, subject, newVersion++, newId++));
            }
        }
        catch (RestClientException e) {
            Assertions.fail((String)("It should be possible to register schemas when a leader cluster is present. Error: " + e.getMessage()));
        }
        String anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        try {
            ids.add(aFollower.restClient.registerSchema(anotherSchema, subject, newVersion++, newId++));
        }
        catch (RestClientException e) {
            Assertions.fail((String)"Should be possible register a schema through follower cluster.");
        }
        try {
            Iterator e = ids.iterator();
            while (e.hasNext()) {
                id = (Integer)e.next();
                this.waitUntilIdExists(aFollower.restClient, id, String.format("Should be possible to fetch id %d from this follower.", id));
                this.waitUntilIdExists(aLeader.restClient, id, String.format("Should be possible to fetch id %d from this leader.", id));
                SchemaString followerResponse = aFollower.restClient.getId(id);
                SchemaString leaderResponse = aLeader.restClient.getId(id);
                Assertions.assertEquals((Object)followerResponse.getSchemaString(), (Object)leaderResponse.getSchemaString(), (String)"Leader and follower responded with different schemas when queried with the same id.");
            }
        }
        catch (RestClientException e) {
            Assertions.fail((String)"Expected ids were not found in the schema registry.");
        }
        while (leaderApps.size() > 0) {
            leader = LeaderElectorTest.findLeader(leaderApps);
            leaderApps.remove(leader);
            ((RestApp)leader).stop();
            this.waitUntilLeaderElectionCompletes(leaderApps);
        }
        anotherSchema = TestUtils.getRandomCanonicalAvroString(1).get(0);
        successfullyRegistered = false;
        try {
            aFollower.restClient.registerSchema(anotherSchema, subject, newVersion++, newId++);
            successfullyRegistered = true;
        }
        catch (RestClientException leader2) {
            // empty catch block
        }
        Assertions.assertFalse((boolean)successfullyRegistered, (String)"Should not be possible to register with no leaders present.");
        try {
            leader = ids.iterator();
            while (leader.hasNext()) {
                id = (Integer)leader.next();
                SchemaString schemaString = aFollower.restClient.getId(id);
            }
            List versions = aFollower.restClient.getAllVersions(subject);
            Assertions.assertEquals((int)ids.size(), (int)versions.size(), (String)"Number of ids should match number of versions.");
        }
        catch (RestClientException e) {
            Assertions.fail((String)"Should be possible to fetch registered schemas even with no leaders present.");
        }
        for (RestApp follower : followerApps) {
            follower.stop();
        }
    }

    private static RestApp findLeader(Collection<RestApp> cluster) {
        for (RestApp restApp : cluster) {
            if (!restApp.isLeader()) continue;
            return restApp;
        }
        return null;
    }

    private static void checkNoneIsLeader(Collection<RestApp> cluster) {
        Assertions.assertNull((Object)LeaderElectorTest.findLeader(cluster), (String)"Expected none of the nodes in this cluster to report itself as leader.");
    }

    private static void checkLeaderIdentity(Collection<RestApp> cluster, SchemaRegistryIdentity expectedLeaderIdentity) {
        for (RestApp restApp : cluster) {
            for (int i = 0; i < 3; ++i) {
                SchemaRegistryIdentity leaderIdentity = restApp.leaderIdentity();
                if (leaderIdentity == null) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                }
                Assertions.assertEquals((Object)expectedLeaderIdentity, (Object)leaderIdentity, (String)("Each leader identity should be " + String.valueOf(expectedLeaderIdentity)));
            }
        }
    }

    private static Set<SchemaRegistryIdentity> getLeaderIdentities(Collection<RestApp> cluster) {
        HashSet<SchemaRegistryIdentity> leaderIdentities = new HashSet<SchemaRegistryIdentity>();
        for (RestApp app : cluster) {
            if (app == null || app.leaderIdentity() == null) continue;
            leaderIdentities.add(app.leaderIdentity());
        }
        return leaderIdentities;
    }

    private static RestApp checkOneLeader(Collection<RestApp> cluster) {
        int leaderCount = 0;
        RestApp leader = null;
        for (RestApp restApp : cluster) {
            if (!restApp.isLeader()) continue;
            ++leaderCount;
            leader = restApp;
        }
        Assertions.assertEquals((int)1, (int)leaderCount, (String)("Expected one leader but found " + leaderCount));
        return leader;
    }

    private void waitUntilLeaderElectionCompletes(final Collection<RestApp> cluster) {
        if (cluster == null || cluster.size() == 0) {
            return;
        }
        Callable<Boolean> newLeaderElected = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                boolean hasLeader = LeaderElectorTest.findLeader(cluster) != null;
                boolean oneReportedLeader = LeaderElectorTest.getLeaderIdentities(cluster).size() == 1;
                return hasLeader && oneReportedLeader;
            }
        };
        TestUtils.waitUntilTrue(newLeaderElected, 15000L, "A node should have been elected leader by now.");
    }

    private void waitUntilIdExists(final RestService restService, final int expectedId, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    restService.getId(expectedId);
                    return true;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }

    private void waitUntilCompatibilityLevelSet(final RestService restService, final String subject, final String expectedCompatibilityLevel, String errorMsg) {
        Callable<Boolean> canGetSchemaById = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                try {
                    String actualCompatibilityLevel = restService.getConfig(subject).getCompatibilityLevel();
                    return expectedCompatibilityLevel.compareTo(actualCompatibilityLevel) == 0;
                }
                catch (RestClientException e) {
                    return false;
                }
            }
        };
        TestUtils.waitUntilTrue(canGetSchemaById, 15000L, errorMsg);
    }

    private void verifyIdAndSchema(RestService restService, int expectedId, String expectedSchemaString, String errMsg) {
        this.waitUntilIdExists(restService, expectedId, errMsg);
        String schemaString = null;
        try {
            schemaString = restService.getId(expectedId).getSchemaString();
        }
        catch (IOException e) {
            Assertions.fail((String)errMsg);
        }
        catch (RestClientException e) {
            Assertions.fail((String)errMsg);
        }
        Assertions.assertEquals((Object)expectedSchemaString, (Object)schemaString, (String)errMsg);
    }
}

