package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.BindException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Connect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/ConnectExecutable.class */
public final class ConnectExecutable implements Executable {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectExecutable.class);
    private final Map<String, String> workerProps;
    private Connect connect;
    private final CountDownLatch terminateLatch = new CountDownLatch(1);
    private final ConnectDistributed connectDistributed = new ConnectDistributed();

    public static ConnectExecutable of(String str) throws IOException {
        return new ConnectExecutable(!str.isEmpty() ? Utils.propsToStringMap(Utils.loadProps(str)) : Collections.emptyMap());
    }

    @VisibleForTesting
    ConnectExecutable(Map<String, String> map) {
        this.workerProps = (Map) Objects.requireNonNull(map, "workerProps");
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void startAsync() {
        try {
            this.connect = this.connectDistributed.startConnect(this.workerProps);
        } catch (ConnectException e) {
            if (!(e.getCause() instanceof IOException) || !(e.getCause().getCause() instanceof BindException)) {
                throw e;
            }
            LOG.warn("Cannot start a local connect instance because connect is running locally!", e);
        }
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void shutdown() {
        if (this.connect != null) {
            this.connect.stop();
        }
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void notifyTerminated() {
        this.terminateLatch.countDown();
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void awaitTerminated() throws InterruptedException {
        this.terminateLatch.await();
    }
}
