/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.security.auth.client.acl;

import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclUpdateListener;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.server.authorizer.Authorizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MdsAclMigration {
    private static final Logger log = LoggerFactory.getLogger((String)"kafka.authorizer.logger");
    private final String clusterId;
    private final Supplier<Integer> brokerIdSupplier;
    private int writerBrokerId;

    public MdsAclMigration(String clusterId, Supplier<Integer> brokerIdSupplier) {
        this.clusterId = clusterId;
        this.brokerIdSupplier = brokerIdSupplier;
    }

    public void migrate(Map<String, ?> configs, Authorizer aclAuthorizer, ConfluentAdmin mdsAdminClient) {
        try {
            while (true) {
                Integer brokerId = this.brokerIdSupplier.get();
                while (brokerId == null) {
                    Thread.sleep(10L);
                    brokerId = this.brokerIdSupplier.get();
                }
                this.writerBrokerId = brokerId;
                try {
                    this.tryMigrate(configs, aclAuthorizer, mdsAdminClient);
                }
                catch (RetriableException e) {
                    log.warn("Migration failed, retring", (Throwable)e);
                    continue;
                }
                break;
            }
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
    }

    private void tryMigrate(Map<String, ?> configs, Authorizer aclAuthorizer, ConfluentAdmin mdsAdminClient) {
        ConfluentAuthorizerConfig confluentAuthorizerConfig = new ConfluentAuthorizerConfig(configs);
        log.info("Starting Acl migration from ZK to metadata service");
        this.addUpdateListener(aclAuthorizer, mdsAdminClient);
        Iterable aclBindings = aclAuthorizer.acls(AclBindingFilter.ANY);
        int batchSize = confluentAuthorizerConfig.getInt("confluent.authorizer.acl.migration.batch.size");
        ArrayList<AclBinding> aclBatch = new ArrayList<AclBinding>(batchSize);
        int count = 0;
        for (AclBinding aclBinding : aclBindings) {
            aclBatch.add(aclBinding);
            if (++count != batchSize) continue;
            this.migrateBindings(mdsAdminClient, aclBatch);
            count = 0;
            aclBatch.clear();
        }
        if (!aclBatch.isEmpty()) {
            this.migrateBindings(mdsAdminClient, aclBatch);
        }
        log.info("Completed Acl migration from ZK to metadata service");
    }

    private void addUpdateListener(Authorizer aclAuthorizer, final ConfluentAdmin mdsAdminClient) {
        aclAuthorizer.registerAclUpdateListener(new AclUpdateListener(){

            public void handleUpdate(ResourcePattern resourcePattern, Set<AccessControlEntry> aclBindings) {
                log.info("handling ACL updates during migration for resource {}, bindings {}", (Object)resourcePattern, aclBindings);
                try {
                    List<AclBinding> aclBatch;
                    Map<AclBinding, ApiError> createResults;
                    Map<AclBinding, ApiError> failedCreateResults;
                    AclBindingFilter aclBindingFilter = new AclBindingFilter(resourcePattern.toFilter(), AccessControlEntryFilter.ANY);
                    try {
                        MdsAclMigration.this.ensureNoRebalance();
                        DeleteAclsResult.FilterResults deleteResults = (DeleteAclsResult.FilterResults)((KafkaFuture)mdsAdminClient.deleteCentralizedAcls(Collections.singleton(aclBindingFilter), new DeleteAclsOptions(), MdsAclMigration.this.clusterId, MdsAclMigration.this.writerBrokerId).values().get(aclBindingFilter)).get();
                        Map<AclBinding, ApiException> failedDeleteResults = MdsAclMigration.this.failedDeleteResults(deleteResults);
                        if (!failedDeleteResults.isEmpty()) {
                            log.error("Failed to update delete ACLs bindings from metadata service: failed list {}", failedDeleteResults);
                        }
                    }
                    catch (Throwable t) {
                        log.error("Failed to delete ACLs bindings from metadata service: failed filter {}", (Object)aclBindingFilter);
                    }
                    if (!aclBindings.isEmpty() && !(failedCreateResults = MdsAclMigration.this.failedCreateResults(createResults = MdsAclMigration.this.createAcls(mdsAdminClient, aclBatch = aclBindings.stream().map(a -> new AclBinding(resourcePattern, a)).collect(Collectors.toList())))).isEmpty()) {
                        log.error("Failed to update ACls to metadata service: failed list {}", failedCreateResults);
                    }
                }
                catch (Exception e) {
                    log.error("Error while handling ACL updates", (Throwable)e);
                }
            }
        });
    }

    private void migrateBindings(ConfluentAdmin mdsAdminClient, List<AclBinding> aclBatch) {
        log.info("Starting migrating Acls of batch size {}", (Object)aclBatch.size());
        Map<AclBinding, ApiError> createResults = this.createAcls(mdsAdminClient, aclBatch);
        Map<AclBinding, ApiError> failedBindings = this.failedCreateResults(createResults);
        if (!failedBindings.isEmpty()) {
            log.error("Failed to migrate Acls from ZK to metadata service: failed list {}", failedBindings);
            throw new RuntimeException("Failed to migrate Acls from ZK to metadata service.");
        }
        log.info("Completed migrating Acls of batch size {}", (Object)aclBatch.size());
    }

    private Map<AclBinding, ApiError> createAcls(ConfluentAdmin mdsAdminClient, List<AclBinding> aclBatch) {
        this.ensureNoRebalance();
        return mdsAdminClient.createCentralizedAcls(aclBatch, new CreateAclsOptions(), this.clusterId, this.writerBrokerId).values().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
            try {
                ((KafkaFuture)e.getValue()).get();
                return ApiError.NONE;
            }
            catch (Throwable t) {
                return ApiError.fromThrowable((Throwable)t);
            }
        }));
    }

    private Map<AclBinding, ApiError> failedCreateResults(Map<AclBinding, ApiError> createAclResults) {
        return createAclResults.entrySet().stream().filter(x -> x.getValue() != ApiError.NONE).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private Map<AclBinding, ApiException> failedDeleteResults(DeleteAclsResult.FilterResults deleteAclsResult) {
        return deleteAclsResult.values().stream().filter(x -> x.exception() != null).collect(Collectors.toMap(DeleteAclsResult.FilterResult::binding, DeleteAclsResult.FilterResult::exception));
    }

    private void ensureNoRebalance() {
        if (this.writerBrokerId != this.brokerIdSupplier.get()) {
            throw new RebalanceInProgressException("Writer broker changed, restarting ACL migration");
        }
    }
}

