package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest.class */
public class BlockingConnectorTest {
    private static final int NUM_WORKERS = 1;
    private static final String BLOCKING_CONNECTOR_NAME = "blocking-connector";
    private static final String NORMAL_CONNECTOR_NAME = "normal-connector";
    private static final String TEST_TOPIC = "normal-topic";
    private static final int NUM_RECORDS_PRODUCED = 100;
    private EmbeddedConnectCluster connect;
    private ConnectorHandle normalConnectorHandle;
    private static final Logger log = LoggerFactory.getLogger(BlockingConnectorTest.class);
    private static final long CONNECT_WORKER_STARTUP_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private static final long REST_REQUEST_TIMEOUT = Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingConnector.class */
    public static class BlockingConnector extends SourceConnector {
        private static CountDownLatch blockLatch;
        private String block;
        public static final String INITIALIZE = "initialize";
        public static final String INITIALIZE_WITH_TASK_CONFIGS = "initializeWithTaskConfigs";
        public static final String START = "start";
        public static final String RECONFIGURE = "reconfigure";
        public static final String TASK_CLASS = "taskClass";
        public static final String TASK_CONFIGS = "taskConfigs";
        public static final String STOP = "stop";
        public static final String VALIDATE = "validate";
        public static final String CONFIG = "config";
        public static final String VERSION = "version";
        public static final String BLOCK_CONFIG = "block";
        private static final ConfigDef CONFIG_DEF = new ConfigDef().define(BLOCK_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Where to block indefinitely, e.g., 'start', 'initialize', 'taskConfigs', 'version'");

        /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingConnector$BlockingTask.class */
        public static class BlockingTask extends SourceTask {
            public void start(Map<String, String> map) {
            }

            public List<SourceRecord> poll() {
                return null;
            }

            public void stop() {
            }

            public String version() {
                return "0.0.0";
            }
        }

        public BlockingConnector() {
            this(null);
        }

        protected BlockingConnector(String str) {
            this.block = str;
            synchronized (BlockingConnector.class) {
                if (blockLatch != null) {
                    blockLatch.countDown();
                }
                blockLatch = new CountDownLatch(1);
            }
        }

        public static void waitForBlock() throws InterruptedException {
            synchronized (BlockingConnector.class) {
                if (blockLatch == null) {
                    throw new IllegalArgumentException("No connector has been created yet");
                }
            }
            BlockingConnectorTest.log.debug("Waiting for connector to block");
            blockLatch.await();
            BlockingConnectorTest.log.debug("Connector should now be blocked");
        }

        public static void resetBlockLatch() {
            synchronized (BlockingConnector.class) {
                if (blockLatch != null) {
                    blockLatch.countDown();
                    blockLatch = null;
                }
            }
        }

        public void initialize(ConnectorContext connectorContext) {
            maybeBlockOn(INITIALIZE);
            super.initialize(connectorContext);
        }

        public void initialize(ConnectorContext connectorContext, List<Map<String, String>> list) {
            maybeBlockOn(INITIALIZE_WITH_TASK_CONFIGS);
            super.initialize(connectorContext, list);
        }

        public void start(Map<String, String> map) {
            this.block = new AbstractConfig(CONFIG_DEF, map).getString(BLOCK_CONFIG);
            maybeBlockOn(START);
        }

        public void reconfigure(Map<String, String> map) {
            super.reconfigure(map);
            maybeBlockOn(RECONFIGURE);
        }

        public Class<? extends Task> taskClass() {
            maybeBlockOn(TASK_CLASS);
            return BlockingTask.class;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            maybeBlockOn(TASK_CONFIGS);
            return Collections.singletonList(Collections.emptyMap());
        }

        public void stop() {
            maybeBlockOn(STOP);
        }

        public Config validate(Map<String, String> map) {
            maybeBlockOn(VALIDATE);
            return super.validate(map);
        }

        public ConfigDef config() {
            maybeBlockOn(CONFIG);
            return CONFIG_DEF;
        }

        public String version() {
            maybeBlockOn(VERSION);
            return "0.0.0";
        }

        protected void maybeBlockOn(String str) {
            if (!str.equals(this.block)) {
                BlockingConnectorTest.log.debug("Will not block on {}", str);
            } else {
                BlockingConnectorTest.log.info("Will block on {}", str);
                blockLatch.countDown();
                while (true) {
                    try {
                        Thread.sleep(Long.MAX_VALUE);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$ConfigBlockingConnector.class */
    public static class ConfigBlockingConnector extends BlockingConnector {
        public ConfigBlockingConnector() {
            super(BlockingConnector.CONFIG);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$InitializeBlockingConnector.class */
    public static class InitializeBlockingConnector extends BlockingConnector {
        public InitializeBlockingConnector() {
            super(BlockingConnector.INITIALIZE);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$ValidateBlockingConnector.class */
    public static class ValidateBlockingConnector extends BlockingConnector {
        public ValidateBlockingConnector() {
            super(BlockingConnector.VALIDATE);
        }
    }

    @Before
    public void setup() throws Exception {
        ConnectorsResource.setRequestTimeout(REST_REQUEST_TIMEOUT);
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(1).numBrokers(1).workerProps(new HashMap()).brokerProps(new Properties()).build();
        this.connect.start();
        TestUtils.waitForCondition(() -> {
            return this.connect.requestGet(this.connect.endpointForResource("connectors/nonexistent")).getStatus() == 404;
        }, CONNECT_WORKER_STARTUP_TIMEOUT, "Worker did not complete startup in time");
    }

    @After
    public void close() {
        this.connect.stop();
        ConnectorsResource.resetRequestTimeout();
        BlockingConnector.resetBlockLatch();
    }

    @Test
    public void testBlockInConnectorValidate() throws Exception {
        log.info("Starting test testBlockInConnectorValidate");
        Assert.assertThrows(ConnectRestException.class, () -> {
            createConnectorWithBlock(ValidateBlockingConnector.class);
        });
        BlockingConnector.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorConfig() throws Exception {
        log.info("Starting test testBlockInConnectorConfig");
        Assert.assertThrows(ConnectRestException.class, () -> {
            createConnectorWithBlock(ConfigBlockingConnector.class);
        });
        BlockingConnector.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorInitialize() throws Exception {
        log.info("Starting test testBlockInConnectorInitialize");
        createConnectorWithBlock(InitializeBlockingConnector.class);
        BlockingConnector.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorStart() throws Exception {
        log.info("Starting test testBlockInConnectorStart");
        createConnectorWithBlock(BlockingConnector.START);
        BlockingConnector.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorStop() throws Exception {
        log.info("Starting test testBlockInConnectorStop");
        createConnectorWithBlock(BlockingConnector.STOP);
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        this.connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
        BlockingConnector.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testWorkerRestartWithBlockInConnectorStart() throws Exception {
        log.info("Starting test testWorkerRestartWithBlockInConnectorStart");
        createConnectorWithBlock(BlockingConnector.START);
        BlockingConnector.waitForBlock();
        createNormalConnector();
        this.connect.removeWorker();
        this.connect.addWorker();
        BlockingConnector.waitForBlock();
        verifyNormalConnector();
    }

    @Test
    public void testWorkerRestartWithBlockInConnectorStop() throws Exception {
        log.info("Starting test testWorkerRestartWithBlockInConnectorStop");
        createConnectorWithBlock(BlockingConnector.STOP);
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        createNormalConnector();
        waitForConnectorStart(NORMAL_CONNECTOR_NAME);
        this.connect.removeWorker();
        BlockingConnector.waitForBlock();
        this.connect.addWorker();
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        verifyNormalConnector();
    }

    private void createConnectorWithBlock(String str) {
        Map<String, String> baseBlockingConnectorProps = baseBlockingConnectorProps();
        baseBlockingConnectorProps.put(BlockingConnector.BLOCK_CONFIG, str);
        log.info("Creating connector with block during {}", str);
        try {
            this.connect.configureConnector(BLOCKING_CONNECTOR_NAME, baseBlockingConnectorProps);
        } catch (RuntimeException e) {
            log.info("Failed to create connector", e);
            throw e;
        }
    }

    private void createConnectorWithBlock(Class<? extends BlockingConnector> cls) {
        Map<String, String> baseBlockingConnectorProps = baseBlockingConnectorProps();
        baseBlockingConnectorProps.put("connector.class", cls.getName());
        log.info("Creating blocking connector of type {}", cls.getSimpleName());
        try {
            this.connect.configureConnector(BLOCKING_CONNECTOR_NAME, baseBlockingConnectorProps);
        } catch (RuntimeException e) {
            log.info("Failed to create connector", e);
            throw e;
        }
    }

    private Map<String, String> baseBlockingConnectorProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", BlockingConnector.class.getName());
        hashMap.put("tasks.max", "1");
        return hashMap;
    }

    private void createNormalConnector() {
        this.connect.kafka().createTopic(TEST_TOPIC, 3);
        this.normalConnectorHandle = RuntimeHandles.get().connectorHandle(NORMAL_CONNECTOR_NAME);
        this.normalConnectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
        this.normalConnectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, TEST_TOPIC);
        log.info("Creating normal connector");
        try {
            this.connect.configureConnector(NORMAL_CONNECTOR_NAME, hashMap);
        } catch (RuntimeException e) {
            log.info("Failed to create connector", e);
            throw e;
        }
    }

    private void waitForConnectorStart(String str) throws InterruptedException {
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(str, 0, String.format("Failed to observe transition to 'RUNNING' state for connector '%s' in time", str));
    }

    private void verifyNormalConnector() throws InterruptedException {
        waitForConnectorStart(NORMAL_CONNECTOR_NAME);
        this.normalConnectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
    }
}
