package io.confluent.connect.replicator;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/ReadAclCheck.class */
public class ReadAclCheck extends ConfigurationCheck {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReadAclCheck.class);
    private Map<String, Object> srcAdminClientConfig;
    private Admin srcAdminClient = null;
    private Set<String> topics;
    private ConsumerProvider consumerProvider;

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public boolean performCheck() {
        boolean z = true;
        try {
            try {
                if (this.srcAdminClient == null) {
                    this.srcAdminClient = AdminClient.create(this.srcAdminClientConfig);
                }
                for (String str : this.topics) {
                    log.debug("Retrieved description {} for topic {}", this.srcAdminClient.describeTopics(new HashSet(Arrays.asList(str))).all().get(), str);
                }
                closeAdminClient();
            } catch (Throwable th) {
                closeAdminClient();
                throw th;
            }
        } catch (Exception e) {
            if ((e instanceof TopicAuthorizationException) || (e.getCause() instanceof TopicAuthorizationException)) {
                log.error("Not Authorized to describe topic on source cluster!", (Throwable) e);
            } else {
                log.error("Describe call for topic failed ", (Throwable) e);
            }
            z = false;
            closeAdminClient();
        }
        boolean z2 = true;
        for (String str2 : this.topics) {
            Consumer<byte[], byte[]> consumer = this.consumerProvider.getConsumer();
            consumer.assign(Utils.getAssignment(str2));
            consumer.seekToEnd(Utils.getAssignment(str2));
            try {
                try {
                    log.debug("Polling with consumer for {}", str2);
                    consumer.poll(Duration.ofMillis(100L));
                    closeConsumer(consumer);
                } catch (Throwable th2) {
                    closeConsumer(consumer);
                    throw th2;
                }
            } catch (AuthorizationException e2) {
                log.error("Unable to read from source topic " + str2, (Throwable) e2);
                z2 = false;
                closeConsumer(consumer);
            } catch (Exception e3) {
                log.error("Encountered error: ", (Throwable) e3);
                z2 = false;
                closeConsumer(consumer);
            }
        }
        return z && z2;
    }

    private void closeConsumer(Consumer<byte[], byte[]> consumer) {
        if (consumer != null) {
            consumer.close();
        }
    }

    private void closeAdminClient() {
        if (this.srcAdminClient != null) {
            this.srcAdminClient.close();
        }
    }

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public String helpText() {
        return "This check sanity checks ACLs on the source cluster that Replicator relies on.\n1. DESCRIBE permissions on the cluster\n2. DESCRIBE permissions on each topic that is expected to be replicated\n3. READ permissions on the group that replicator will run as\nUse `kafka-acls` to set these permissions, refer to https://docs.confluent.io/current/multi-dc-deployments/replicator/index.html";
    }

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public String getName() {
        return "Source topic ACL check";
    }

    public ReadAclCheck setAdminClient(Admin admin) {
        this.srcAdminClient = admin;
        return this;
    }

    public ReadAclCheck setTopics(Set<String> set) {
        this.topics = set;
        return this;
    }

    public ReadAclCheck setConsumerProvider(ConsumerProvider consumerProvider) {
        this.consumerProvider = consumerProvider;
        return this;
    }

    public ReadAclCheck setAdminClientConfig(Map<String, Object> map) {
        this.srcAdminClientConfig = map;
        return this;
    }
}
