/*
 * Decompiled with CFR 0.152.
 */
package kafka.catalog.event;

import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import java.util.Objects;
import java.util.Set;
import kafka.catalog.CatalogTopicConfigUtils;
import kafka.catalog.MetadataEventUtils;
import kafka.catalog.ZKMetadataCollector;
import kafka.catalog.ZKMetadataCollectorContext;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;

public class BrokerDefaultConfigChangeEvent
extends MetadataCollectorEvent {
    private final KafkaConfig oldConfig;
    private final KafkaConfig newConfig;

    public BrokerDefaultConfigChangeEvent(ZKMetadataCollector collector, KafkaConfig oldConfig, KafkaConfig newConfig, Time time) {
        super(collector, time);
        this.oldConfig = oldConfig;
        this.newConfig = newConfig;
    }

    public void run() throws Exception {
        ZKMetadataCollectorContext context = this.context();
        Timestamp eventTimestamp = Timestamps.fromMillis((long)this.eventObservedTimeMillis);
        for (String logicalCluster : context.localStore().logicalClusters()) {
            Set<String> topics = context.localStore().topics(logicalCluster);
            for (String topic : topics) {
                try {
                    TopicMetadata.Builder builder = TopicMetadata.newBuilder().mergeFrom(context.localStore().topicMetadataEvent(topic).getTopicMetadata()).setUpdateTime(eventTimestamp);
                    if (!BrokerDefaultConfigChangeEvent.propagateBrokerConfigChange(context, this.oldConfig, this.newConfig, topic, builder)) continue;
                    LOG.info("In BrokerDefaultConfigChangeEvent, propagating new KafkaConfig: {} for topic: '{}'", (Object)this.newConfig, (Object)topic);
                    MetadataEvent topicMetadataEvent = MetadataEvent.newBuilder().setTopicMetadata(builder.build()).build();
                    context.localStore().addTopicMetadataEvent(logicalCluster, topic, topicMetadataEvent);
                }
                catch (Exception e) {
                    LOG.error("Skip propagating new KafkaConfig: {} to topic '{}' due to", new Object[]{this.newConfig, topic, e});
                    context.catalogMetrics().collectorEventHandleErrorSensor.record();
                }
            }
        }
    }

    public static boolean propagateBrokerConfigChange(ZKMetadataCollectorContext context, KafkaConfig oldConfig, KafkaConfig newConfig, String topic, TopicMetadata.Builder builder) {
        Set<String> topicConfigOverrides = context.localStore().topicConfigOverrides(topic);
        LogConfig brokerDefaultConfig = new LogConfig(context.originalConfig().extractLogConfigMap());
        LogConfig newLogConfig = new LogConfig(newConfig.extractLogConfigMap());
        boolean shouldPropagate = false;
        Set<String> configs = context.config().fullConfigsEnable ? CatalogTopicConfigUtils.FULL_BROKER_DEFAULT_CONFIGS_TO_PROPAGATE : CatalogTopicConfigUtils.BROKER_DEFAULT_CONFIGS_TO_PROPAGATE;
        for (String configKey : configs) {
            String logConfigKey = CatalogTopicConfigUtils.SERVER_CONFIG_TO_TOPIC_CONFIG.get(configKey);
            Object newConfigValue = newConfig.get(configKey);
            if (Objects.equals(oldConfig.get(configKey), newConfigValue) || topicConfigOverrides.contains(logConfigKey)) continue;
            if (newConfigValue == null) {
                MetadataEventUtils.setField(builder, brokerDefaultConfig, logConfigKey);
            } else {
                MetadataEventUtils.setField(builder, newLogConfig, logConfigKey);
            }
            shouldPropagate = true;
        }
        return shouldPropagate;
    }

    public String toString() {
        return "BrokerDefaultConfigChangeEvent(oldConfig=" + this.oldConfig + ", newConfig=" + this.newConfig + ')';
    }
}

