package org.apache.kafka.connect.runtime.rest.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Path("/connectors")
@Consumes({"application/json"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.class */
public class ConnectorsResource {
    private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
    private static final long REQUEST_TIMEOUT_MS = 90000;
    private final Herder herder;

    @Context
    private ServletContext context;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$CreatedConnectorInfoTranslator.class */
    private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
        private CreatedConnectorInfoTranslator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.Translator
        public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> httpResponse) {
            return new Herder.Created<>(httpResponse.status() == 201, httpResponse.body());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$IdentityTranslator.class */
    public class IdentityTranslator<T> implements Translator<T, T> {
        private IdentityTranslator() {
        }

        @Override // org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.Translator
        public T translate(RestServer.HttpResponse<T> httpResponse) {
            return httpResponse.body();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$Translator.class */
    public interface Translator<T, U> {
        T translate(RestServer.HttpResponse<U> httpResponse);
    }

    public ConnectorsResource(Herder herder) {
        this.herder = herder;
    }

    @GET
    @Path("/")
    public Collection<String> listConnectors(@QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectors(futureCallback);
        return (Collection) completeOrForwardRequest(futureCallback, "/connectors", "GET", null, new TypeReference<Collection<String>>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.1
        }, bool);
    }

    @POST
    @Path("/")
    public Response createConnector(@QueryParam("forward") Boolean bool, CreateConnectorRequest createConnectorRequest) throws Throwable {
        String name = createConnectorRequest.name();
        if (name.contains("/")) {
            throw new BadRequestException("connector name should not contain '/'");
        }
        Map<String, String> config = createConnectorRequest.config();
        if (!config.containsKey("name")) {
            config.put("name", name);
        }
        FutureCallback futureCallback = new FutureCallback();
        this.herder.putConnectorConfig(name, config, false, futureCallback);
        return Response.created(URI.create("/connectors/" + name)).entity(((Herder.Created) completeOrForwardRequest(futureCallback, "/connectors", "POST", createConnectorRequest, new TypeReference<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.2
        }, new CreatedConnectorInfoTranslator(), bool)).result()).build();
    }

    @GET
    @Path("/{connector}")
    public ConnectorInfo getConnector(@PathParam("connector") String str, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorInfo(str, futureCallback);
        return (ConnectorInfo) completeOrForwardRequest(futureCallback, "/connectors/" + str, "GET", null, bool);
    }

    @GET
    @Path("/{connector}/config")
    public Map<String, String> getConnectorConfig(@PathParam("connector") String str, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorConfig(str, futureCallback);
        return (Map) completeOrForwardRequest(futureCallback, "/connectors/" + str + "/config", "GET", null, bool);
    }

    @GET
    @Path("/{connector}/status")
    public ConnectorStateInfo getConnectorStatus(@PathParam("connector") String str) throws Throwable {
        return this.herder.connectorStatus(str);
    }

    @Path("/{connector}/config")
    @PUT
    public Response putConnectorConfig(@PathParam("connector") String str, @QueryParam("forward") Boolean bool, Map<String, String> map) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        String str2 = map.get("name");
        if (str2 == null) {
            map.put("name", str);
        } else if (!str2.equals(str)) {
            throw new BadRequestException("Connector name configuration (" + str2 + ") doesn't match connector name in the URL (" + str + ")");
        }
        this.herder.putConnectorConfig(str, map, true, futureCallback);
        Herder.Created created = (Herder.Created) completeOrForwardRequest(futureCallback, "/connectors/" + str + "/config", "PUT", map, new TypeReference<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.3
        }, new CreatedConnectorInfoTranslator(), bool);
        return (created.created() ? Response.created(URI.create("/connectors/" + str)) : Response.ok()).entity(created.result()).build();
    }

    @POST
    @Path("/{connector}/restart")
    public void restartConnector(@PathParam("connector") String str, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(str, futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/restart", "POST", null, bool);
    }

    @Path("/{connector}/pause")
    @PUT
    public Response pauseConnector(@PathParam("connector") String str) {
        this.herder.pauseConnector(str);
        return Response.accepted().build();
    }

    @Path("/{connector}/resume")
    @PUT
    public Response resumeConnector(@PathParam("connector") String str) {
        this.herder.resumeConnector(str);
        return Response.accepted().build();
    }

    @GET
    @Path("/{connector}/tasks")
    public List<TaskInfo> getTaskConfigs(@PathParam("connector") String str, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.taskConfigs(str, futureCallback);
        return (List) completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.4
        }, bool);
    }

    @POST
    @Path("/{connector}/tasks")
    public void putTaskConfigs(@PathParam("connector") String str, @QueryParam("forward") Boolean bool, List<Map<String, String>> list) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.putTaskConfigs(str, list, futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks", "POST", list, bool);
    }

    @GET
    @Path("/{connector}/tasks/{task}/status")
    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String str, @PathParam("task") Integer num) throws Throwable {
        return this.herder.taskStatus(new ConnectorTaskId(str, num.intValue()));
    }

    @POST
    @Path("/{connector}/tasks/{task}/restart")
    public void restartTask(@PathParam("connector") String str, @PathParam("task") Integer num, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(new ConnectorTaskId(str, num.intValue()), futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks/" + num + "/restart", "POST", null, bool);
    }

    @Path("/{connector}")
    @DELETE
    public void destroyConnector(@PathParam("connector") String str, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.deleteConnectorConfig(str, futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str, "DELETE", null, bool);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T, U> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, Object obj, TypeReference<U> typeReference, Translator<T, U> translator, Boolean bool) throws Throwable {
        try {
            return futureCallback.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (!(cause instanceof RequestTargetException)) {
                if (cause instanceof RebalanceNeededException) {
                    throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
                }
                throw cause;
            }
            if (bool != null && !bool.booleanValue()) {
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
            }
            String uri = UriBuilder.fromUri(((RequestTargetException) cause).forwardUrl()).path(str).queryParam("forward", new Object[]{Boolean.valueOf(bool == null)}).build(new Object[0]).toString();
            log.debug("Forwarding request {} {} {}", new Object[]{uri, str2, obj});
            return translator.translate(RestServer.httpRequest(uri, str2, obj, typeReference));
        } catch (TimeoutException e3) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
        }
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, Object obj, TypeReference<T> typeReference, Boolean bool) throws Throwable {
        return (T) completeOrForwardRequest(futureCallback, str, str2, obj, typeReference, new IdentityTranslator(), bool);
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, Object obj, Boolean bool) throws Throwable {
        return (T) completeOrForwardRequest(futureCallback, str, str2, obj, null, new IdentityTranslator(), bool);
    }
}
