/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.databalancer.persistence;

import com.google.protobuf.Message;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.SbkTopicUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecordProtoSerde;
import io.confluent.databalancer.persistence.EvenClusterLoadStateRecord;
import io.confluent.databalancer.persistence.EvenClusterLoadStateSerializer;
import io.confluent.databalancer.record.ApiStatus;
import io.confluent.databalancer.record.EvenClusterLoad;
import io.confluent.databalancer.record.FailedBrokers;
import io.confluent.databalancer.record.RemoveBroker;
import io.confluent.databalancer.startup.StartupCheckInterruptedException;
import io.confluent.databalancer.utils.ImmutableSet;
import io.confluent.serializers.ProtoSerde;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import kafka.log.LogConfig;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiStatePersistenceStore
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ApiStatePersistenceStore.class);
    private static final int API_STATUS_KEY_NO_BROKER_ID = -1;
    private static final int API_STATE_TOPIC_PARTITION_COUNT = 1;
    public static final String API_STATE_TOPIC_CLEANUP_POLICY = LogConfig.Compact();
    private static final long READ_TO_END_TIMEOUT_MS = 30000L;
    private static final int MIN_RETENTION_TIME_MS = -1;
    private KafkaBasedLog<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage> apiStatePersistenceLog;
    private String topic;
    private final Map<ImmutableSet<Integer>, BrokerRemovalStateRecord> brokerRemovalStateRecordMap = new ConcurrentHashMap<ImmutableSet<Integer>, BrokerRemovalStateRecord>();
    private volatile Map<Integer, Long> failedBrokers = new HashMap<Integer, Long>();
    private EvenClusterLoadStateRecord evenClusterLoadStateRecord;
    private TopicAdmin topicAdmin;
    private Map<String, Object> baseClientProperties;

    public ApiStatePersistenceStore(KafkaConfig config, Time time, Map<String, Object> clientProperties) {
        Map<String, Object> adminProps = this.getAdminConfig(config, clientProperties);
        this.init(config, time, clientProperties, new TopicAdmin(adminProps));
    }

    public void init(KafkaConfig config, Time time, Map<String, Object> clientProperties, TopicAdmin topicAdmin) {
        this.baseClientProperties = clientProperties;
        this.topic = ApiStatePersistenceStore.apiStatePersistenceStoreTopicName(config);
        this.topicAdmin = topicAdmin;
        this.apiStatePersistenceLog = this.setupAndCreateKafkaBasedLog(config, time);
        this.apiStatePersistenceLog.start();
        LOG.info("Started DataBalancer Api State Persistence Store");
    }

    KafkaBasedLog<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage> setupAndCreateKafkaBasedLog(KafkaConfig config, Time time) {
        Map<String, Object> producerProps = this.getProducerConfig(config);
        producerProps.put("key.serializer", SbkApiStatusKeySerde.class.getName());
        producerProps.put("value.serializer", SbkApiStatusMessageSerde.class.getName());
        Map<String, Object> consumerProps = this.getConsumerConfig(config);
        consumerProps.put("key.deserializer", SbkApiStatusKeySerde.class.getName());
        consumerProps.put("value.deserializer", SbkApiStatusMessageSerde.class.getName());
        return this.createKafkaBasedLog(producerProps, consumerProps, time);
    }

    KafkaBasedLog<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage> createKafkaBasedLog(Map<String, Object> producerProps, Map<String, Object> consumerProps, Time time) {
        return new KafkaBasedLog(this.topic, producerProps, consumerProps, () -> this.topicAdmin, (Callback)new ConsumeCallback(), time, null);
    }

    @Override
    public void close() {
        KafkaCruiseControlUtils.executeSilently(this.apiStatePersistenceLog, KafkaBasedLog::stop);
        KafkaCruiseControlUtils.closeSilently((AutoCloseable)this.topicAdmin);
    }

    public void save(BrokerRemovalStateRecord removalStateRecord, boolean isNew) throws InterruptedException {
        ApiStatus.ApiStatusKey key = ApiStatus.ApiStatusKey.newBuilder().addAllBrokerIds(removalStateRecord.brokerIds()).setConfigType(ApiStatus.ApiType.REMOVE_BROKER_V2).build();
        RemoveBroker.BrokerRemovalStateRecordProto removeBrokerProto = BrokerRemovalStateRecordProtoSerde.serialize(removalStateRecord, isNew);
        ApiStatus.ApiStatusMessage message = ApiStatus.ApiStatusMessage.newBuilder().setRemoveBrokerStatus(removeBrokerProto).build();
        this.apiStatePersistenceLog.send((Object)key, (Object)message);
        this.flushProducer();
        removalStateRecord.setStartTime(removeBrokerProto.getStartTime());
        removalStateRecord.setLastUpdateTime(removeBrokerProto.getLastUpdateTime());
    }

    static String serializeException(Exception ex) {
        String error;
        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);){
            objectOutputStream.writeObject(ex);
            error = Base64.getEncoder().encodeToString(outputStream.toByteArray());
        }
        catch (IOException e) {
            LOG.error("Unable to serialize exception.", (Throwable)ex);
            throw new RuntimeException("Error while serializing exception: " + ex, e);
        }
        return error;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    static Exception deserializeException(String serializedException) {
        byte[] decodedSerializedException = Base64.getDecoder().decode(serializedException);
        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(decodedSerializedException));){
            Exception exception = (Exception)ois.readObject();
            return exception;
        }
        catch (IOException | ClassNotFoundException e) {
            LOG.error("Unable to deserialize exception: " + serializedException, (Throwable)e);
            return null;
        }
    }

    public BrokerRemovalStateRecord getBrokerRemovalStateRecord(ImmutableSet<Integer> brokerIds) {
        return this.brokerRemovalStateRecordMap.get(brokerIds);
    }

    public Map<ImmutableSet<Integer>, BrokerRemovalStateRecord> getAllBrokerRemovalStateRecords() {
        return Collections.unmodifiableMap(this.brokerRemovalStateRecordMap);
    }

    public void putRemovalRecord(BrokerRemovalStateRecord stateRecord) {
        this.brokerRemovalStateRecordMap.put(stateRecord.brokerIds(), stateRecord);
    }

    public Map<Integer, Long> getFailedBrokers() {
        return Collections.unmodifiableMap(this.failedBrokers);
    }

    public void save(Map<Integer, Long> failedBrokers) throws InterruptedException {
        ApiStatus.ApiStatusKey key = ApiStatus.ApiStatusKey.newBuilder().setBrokerId(-1).setConfigType(ApiStatus.ApiType.FAILED_BROKER_LIST).build();
        List failedBrokerList = failedBrokers.entrySet().stream().map(failedBroker -> FailedBrokers.FailedBroker.newBuilder().setBrokerId(((Integer)failedBroker.getKey()).intValue()).setFailedAtTime(((Long)failedBroker.getValue()).longValue()).build()).collect(Collectors.toList());
        FailedBrokers.FailedBrokersList allFailedBrokers = FailedBrokers.FailedBrokersList.newBuilder().addAllFailedBroker(failedBrokerList).build();
        ApiStatus.ApiStatusMessage message = ApiStatus.ApiStatusMessage.newBuilder().setFailedBrokersList(allFailedBrokers).build();
        this.apiStatePersistenceLog.send((Object)key, (Object)message);
        this.flushProducer();
    }

    public void save(EvenClusterLoadStateRecord stateRecord) throws InterruptedException {
        ApiStatus.ApiStatusKey key = ApiStatus.ApiStatusKey.newBuilder().setBrokerId(-1).setConfigType(ApiStatus.ApiType.EVEN_CLUSTER_LOAD).build();
        String currentStateError = "";
        if (stateRecord.currentStateException() != null) {
            currentStateError = ApiStatePersistenceStore.serializeException(stateRecord.currentStateException());
        }
        String previousStateError = "";
        if (stateRecord.previousStateException() != null) {
            previousStateError = ApiStatePersistenceStore.serializeException(stateRecord.previousStateException());
        }
        EvenClusterLoad.EvenClusterLoadStateRecordProto.Builder stateRecordProto = EvenClusterLoad.EvenClusterLoadStateRecordProto.newBuilder().setVersion(1).setCurrentState(EvenClusterLoadStateSerializer.serialize(stateRecord.currentState())).setCurrentStateCreatedAt(stateRecord.currentStateCreatedAt()).setCurrentStateLastUpdatedAt(stateRecord.currentStateLastUpdatedAt()).setCurrentStateError(currentStateError).setPreviousState(EvenClusterLoadStateSerializer.serialize(stateRecord.previousState())).setPreviousStateCreatedAt(stateRecord.previousStateCreatedAt()).setPreviousStateLastUpdatedAt(stateRecord.previousStateLastUpdatedAt()).setPreviousStatusError(previousStateError);
        ApiStatus.ApiStatusMessage message = ApiStatus.ApiStatusMessage.newBuilder().setEvenClusterLoad(stateRecordProto.build()).build();
        this.apiStatePersistenceLog.send((Object)key, (Object)message);
        this.flushProducer();
    }

    public EvenClusterLoadStateRecord getEvenClusterLoadStateRecord() {
        return this.evenClusterLoadStateRecord;
    }

    private void flushProducer() throws InterruptedException {
        try {
            this.apiStatePersistenceLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException | TimeoutException e) {
            LOG.error("Error when writing api status to Kafka.", (Throwable)e);
            throw new RuntimeException("Error when writing api status to Kafka.", e);
        }
    }

    public static void checkStartupCondition(KafkaCruiseControlConfig config, Semaphore abortStartupCheck) {
        Map<String, Object> configPairs = config.mergedConfigValues();
        String topic = ApiStatePersistenceStore.apiStatePersistenceStoreTopicName(configPairs);
        SbkTopicUtils.SbkTopicConfig topicConfig = ApiStatePersistenceStore.getTopicConfig(topic, configPairs);
        long maxTimeoutSec = 60L;
        long currentTimeoutInSec = 1L;
        while (!ApiStatePersistenceStore.maybeCreateTopics(configPairs, topicConfig)) {
            LOG.info("Waiting for {} seconds to ensure that api persistent store topic is created/exists.", (Object)currentTimeoutInSec);
            try {
                if (abortStartupCheck.tryAcquire(currentTimeoutInSec, TimeUnit.SECONDS)) {
                    throw new StartupCheckInterruptedException();
                }
            }
            catch (InterruptedException e) {
                throw new StartupCheckInterruptedException(e);
            }
            currentTimeoutInSec = Math.min(2L * currentTimeoutInSec, maxTimeoutSec);
        }
        LOG.info("Confirmed that topic {} exists.", (Object)topic);
    }

    static boolean maybeCreateTopics(Map<String, ?> config, SbkTopicUtils.SbkTopicConfig topicConfig) {
        try {
            return SbkTopicUtils.checkTopicPropertiesMaybeCreate(topicConfig, config);
        }
        catch (Exception ex) {
            LOG.error("Error when checking for api state topics.", (Throwable)ex);
            return false;
        }
    }

    static SbkTopicUtils.SbkTopicConfig getTopicConfig(String topic, Map<String, ?> config) {
        return new SbkTopicUtils.SbkTopicConfigBuilder().setTopic(topic).setReplicationFactor(config, "topic.replication.factor", ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_DEFAULT).setCleanupPolicy(API_STATE_TOPIC_CLEANUP_POLICY).setPartitionCount(1).setMinRetentionTimeMs(-1L).build();
    }

    private Map<String, Object> getAdminConfig(KafkaConfig config, Map<String, Object> baseClientProperties) {
        Map<String, Object> adminConfigs = this.getClientConfig(config, ConfluentConfigs.ClientType.ADMIN, baseClientProperties);
        return KafkaCruiseControlUtils.filterAdminClientConfigs(adminConfigs);
    }

    private Map<String, Object> getProducerConfig(KafkaConfig config) {
        Map<String, Object> producerConfigs = this.getClientConfig(config, ConfluentConfigs.ClientType.PRODUCER, this.baseClientProperties);
        producerConfigs.put("enable.idempotence", "false");
        return KafkaCruiseControlUtils.filterProducerConfigs(producerConfigs);
    }

    private Map<String, Object> getConsumerConfig(KafkaConfig config) {
        Map<String, Object> consumerConfigs = this.getClientConfig(config, ConfluentConfigs.ClientType.CONSUMER, this.baseClientProperties);
        return KafkaCruiseControlUtils.filterConsumerConfigs(consumerConfigs);
    }

    private Map<String, Object> getClientConfig(KafkaConfig config, ConfluentConfigs.ClientType clientType, Map<String, Object> baseClientProperties) {
        HashMap<String, Object> configs = new HashMap<String, Object>(baseClientProperties);
        Map clientConfigs = ConfluentConfigs.clientConfigs((AbstractConfig)config, (String)"confluent.balancer.", (ConfluentConfigs.ClientType)clientType, (String)this.topic, (String)String.valueOf(config.brokerId()));
        configs.putAll(clientConfigs);
        return configs;
    }

    static String apiStatePersistenceStoreTopicName(Map<String, ?> config) {
        String topicFromConfig = (String)config.get("confluent.balancer.api.state.topic");
        return ApiStatePersistenceStore.apiStatePersistenceStoreTopicName(topicFromConfig);
    }

    static String apiStatePersistenceStoreTopicName(KafkaConfig config) {
        String topicFromConfig = (String)config.get("confluent.balancer.api.state.topic");
        return ApiStatePersistenceStore.apiStatePersistenceStoreTopicName(topicFromConfig);
    }

    private static String apiStatePersistenceStoreTopicName(String topicFromConfig) {
        return topicFromConfig == null || topicFromConfig.isEmpty() ? "_confluent_balancer_api_state" : topicFromConfig;
    }

    private boolean isRemovalRecord(ConsumerRecord<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage> record) {
        return ((ApiStatus.ApiStatusKey)record.key()).getConfigType() == ApiStatus.ApiType.REMOVE_BROKER || ((ApiStatus.ApiStatusKey)record.key()).getConfigType() == ApiStatus.ApiType.REMOVE_BROKER_V2;
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage> record) {
            if (error != null) {
                this.handleConsumeError(error, record, "saving/loading");
                return;
            }
            try {
                if (ApiStatePersistenceStore.this.isRemovalRecord((ConsumerRecord<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage>)record)) {
                    RemoveBroker.BrokerRemovalStateRecordProto removeBrokerRecord = ((ApiStatus.ApiStatusMessage)record.value()).getRemoveBrokerStatus();
                    BrokerRemovalStateRecord status = BrokerRemovalStateRecordProtoSerde.deserialize(removeBrokerRecord);
                    if (status != null) {
                        ApiStatePersistenceStore.this.putRemovalRecord(status);
                    }
                } else if (((ApiStatus.ApiStatusKey)record.key()).getConfigType() == ApiStatus.ApiType.FAILED_BROKER_LIST) {
                    ApiStatePersistenceStore.this.failedBrokers = ((ApiStatus.ApiStatusMessage)record.value()).getFailedBrokersList().getFailedBrokerList().stream().collect(Collectors.toMap(FailedBrokers.FailedBroker::getBrokerId, FailedBrokers.FailedBroker::getFailedAtTime));
                } else if (((ApiStatus.ApiStatusKey)record.key()).getConfigType() == ApiStatus.ApiType.EVEN_CLUSTER_LOAD) {
                    EvenClusterLoad.EvenClusterLoadStateRecordProto stateRecordProto = ((ApiStatus.ApiStatusMessage)record.value()).getEvenClusterLoad();
                    Exception currentStateException = null;
                    String serializedCurrentStatusException = stateRecordProto.getCurrentStateError();
                    if (!serializedCurrentStatusException.isEmpty()) {
                        currentStateException = ApiStatePersistenceStore.deserializeException(serializedCurrentStatusException);
                    }
                    Exception previousStateException = null;
                    String serializedPreviousStatusException = stateRecordProto.getPreviousStatusError();
                    if (!serializedPreviousStatusException.isEmpty()) {
                        previousStateException = ApiStatePersistenceStore.deserializeException(serializedPreviousStatusException);
                    }
                    ApiStatePersistenceStore.this.evenClusterLoadStateRecord = new EvenClusterLoadStateRecord(EvenClusterLoadStateSerializer.deserialize(stateRecordProto.getCurrentState()), stateRecordProto.getCurrentStateCreatedAt(), stateRecordProto.getCurrentStateLastUpdatedAt(), currentStateException, EvenClusterLoadStateSerializer.deserialize(stateRecordProto.getPreviousState()), stateRecordProto.getPreviousStateCreatedAt(), stateRecordProto.getPreviousStateLastUpdatedAt(), previousStateException);
                } else {
                    LOG.error("Invalid ApiType: {}", (Object)((ApiStatus.ApiStatusKey)record.key()).getConfigType());
                }
            }
            catch (Exception e) {
                this.handleConsumeError(e, record, "decoding");
            }
        }

        private void handleConsumeError(Throwable error, ConsumerRecord<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage> record, String action) {
            LOG.error("Error when {} record. API type: {} ", (Object)action, (Object)((ApiStatus.ApiStatusKey)record.key()).getConfigType());
            if (ApiStatePersistenceStore.this.isRemovalRecord((ConsumerRecord<ApiStatus.ApiStatusKey, ApiStatus.ApiStatusMessage>)record)) {
                LOG.error("broker IDs: {} (singleton broker ID: {}), state: {}, start time: {}, last update time: {}", new Object[]{((ApiStatus.ApiStatusKey)record.key()).getBrokerIdsList(), ((ApiStatus.ApiStatusKey)record.key()).getBrokerId(), ((ApiStatus.ApiStatusMessage)record.value()).getRemoveBrokerStatus().getRemovalState(), ((ApiStatus.ApiStatusMessage)record.value()).getRemoveBrokerStatus().getStartTime(), ((ApiStatus.ApiStatusMessage)record.value()).getRemoveBrokerStatus().getLastUpdateTime()});
            } else if (((ApiStatus.ApiStatusKey)record.key()).getConfigType() == ApiStatus.ApiType.ADD_BROKER) {
                LOG.error("broker ID: {}, start time: {}, last update time: {}", new Object[]{((ApiStatus.ApiStatusKey)record.key()).getBrokerId(), ((ApiStatus.ApiStatusMessage)record.value()).getAddBrokerStatus().getStartTime(), ((ApiStatus.ApiStatusMessage)record.value()).getAddBrokerStatus().getLastUpdateTime()});
            } else if (((ApiStatus.ApiStatusKey)record.key()).getConfigType() == ApiStatus.ApiType.EVEN_CLUSTER_LOAD) {
                LOG.error("current state: {}, current state last update time: {},previous state: {}, previous state last update time: {}", new Object[]{((ApiStatus.ApiStatusMessage)record.value()).getEvenClusterLoad().getCurrentState(), ((ApiStatus.ApiStatusMessage)record.value()).getEvenClusterLoad().getCurrentStateLastUpdatedAt(), ((ApiStatus.ApiStatusMessage)record.value()).getEvenClusterLoad().getPreviousState(), ((ApiStatus.ApiStatusMessage)record.value()).getEvenClusterLoad().getPreviousStateLastUpdatedAt()});
            }
            LOG.error("Unexpected error in consumer callback for ApiStatePersistenceStore: ", error);
        }
    }

    public static class SbkApiStatusMessageSerde
    extends ProtoSerde<ApiStatus.ApiStatusMessage> {
        public SbkApiStatusMessageSerde() {
            super((Message)ApiStatus.ApiStatusMessage.getDefaultInstance());
        }
    }

    public static class SbkApiStatusKeySerde
    extends ProtoSerde<ApiStatus.ApiStatusKey> {
        public SbkApiStatusKeySerde() {
            super((Message)ApiStatus.ApiStatusKey.getDefaultInstance());
        }
    }
}

