package org.apache.kafka.streams.processor.internals;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManager.class */
public class InternalTopicManager {
    private static final String ZK_TOPIC_PATH = "/brokers/topics";
    private static final String ZK_BROKER_PATH = "/brokers/ids";
    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
    private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics";
    public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
    public static final String RETENTION_MS = "retention.ms";
    private final ZkClient zkClient;
    private final int replicationFactor;
    private final long windowChangeLogAdditionalRetention;
    private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
    public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = Long.valueOf(TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS));

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManager$ZKStringSerializer.class */
    private class ZKStringSerializer implements ZkSerializer {
        private ZKStringSerializer() {
        }

        public byte[] serialize(Object obj) {
            try {
                return ((String) obj).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new AssertionError(e);
            }
        }

        public Object deserialize(byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            try {
                return new String(bArr, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new AssertionError(e);
            }
        }
    }

    public InternalTopicManager() {
        this.zkClient = null;
        this.replicationFactor = 0;
        this.windowChangeLogAdditionalRetention = WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT.longValue();
    }

    public InternalTopicManager(String str, int i, long j) {
        this.zkClient = new ZkClient(str, 30000, 30000, new ZKStringSerializer());
        this.replicationFactor = i;
        this.windowChangeLogAdditionalRetention = j;
    }

    public void makeReady(InternalTopicConfig internalTopicConfig, int i) {
        boolean z = true;
        while (z) {
            Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(internalTopicConfig.name());
            if (topicMetadata == null) {
                try {
                    createTopic(internalTopicConfig, i, this.replicationFactor);
                } catch (ZkNodeExistsException e) {
                }
            } else if (topicMetadata.size() > i) {
                try {
                    deleteTopic(internalTopicConfig.name());
                } catch (ZkNodeExistsException e2) {
                }
            } else if (topicMetadata.size() < i) {
                try {
                    addPartitions(internalTopicConfig.name(), i - topicMetadata.size(), this.replicationFactor, topicMetadata);
                } catch (ZkNoNodeException e3) {
                }
            } else {
                z = false;
            }
        }
    }

    private List<Integer> getBrokers() {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.zkClient.getChildren(ZK_BROKER_PATH).iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(Integer.parseInt((String) it.next())));
        }
        Collections.sort(arrayList);
        log.debug("Read brokers {} from ZK in partition assignor.", arrayList);
        return arrayList;
    }

    private Map<Integer, List<Integer>> getTopicMetadata(String str) {
        String str2 = (String) this.zkClient.readData("/brokers/topics/" + str, true);
        if (str2 == null) {
            return null;
        }
        try {
            Map<Integer, List<Integer>> map = (Map) ((Map) new ObjectMapper().readValue(str2, new TypeReference<Map<String, Object>>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManager.1
            })).get("partitions");
            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", map, str);
            return map;
        } catch (IOException e) {
            throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + str, e);
        }
    }

    private void createTopic(InternalTopicConfig internalTopicConfig, int i, int i2) throws ZkNodeExistsException {
        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", internalTopicConfig.name(), Integer.valueOf(i));
        ObjectMapper objectMapper = new ObjectMapper();
        List<Integer> brokers = getBrokers();
        int size = brokers.size();
        if (size < i2) {
            log.warn("Not enough brokers found. The replication factor is reduced from " + i2 + " to " + size);
            i2 = size;
        }
        HashMap hashMap = new HashMap();
        for (int i3 = 0; i3 < i; i3++) {
            ArrayList arrayList = new ArrayList();
            for (int i4 = 0; i4 < i2; i4++) {
                arrayList.add(brokers.get((i3 + ((i4 * size) / i2)) % size));
            }
            hashMap.put(Integer.valueOf(i3), arrayList);
        }
        try {
            HashMap hashMap2 = new HashMap();
            hashMap2.put("version", 1);
            hashMap2.put("config", internalTopicConfig.toProperties(this.windowChangeLogAdditionalRetention));
            this.zkClient.createPersistent("/config/topics/" + internalTopicConfig.name(), objectMapper.writeValueAsString(hashMap2), ZooDefs.Ids.OPEN_ACL_UNSAFE);
            try {
                HashMap hashMap3 = new HashMap();
                hashMap3.put("version", 1);
                hashMap3.put("partitions", hashMap);
                this.zkClient.createPersistent("/brokers/topics/" + internalTopicConfig.name(), objectMapper.writeValueAsString(hashMap3), ZooDefs.Ids.OPEN_ACL_UNSAFE);
            } catch (JsonProcessingException e) {
                throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + internalTopicConfig, e);
            }
        } catch (JsonProcessingException e2) {
            throw new StreamsException("Error while creating topic config in ZK for internal topic " + internalTopicConfig, e2);
        }
    }

    private void deleteTopic(String str) throws ZkNodeExistsException {
        log.debug("Deleting topic {} from ZK in partition assignor.", str);
        this.zkClient.createPersistent("/admin/delete_topics/" + str, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }

    private void addPartitions(String str, int i, int i2, Map<Integer, List<Integer>> map) {
        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", new Object[]{str, Integer.valueOf(i), map});
        List<Integer> brokers = getBrokers();
        int size = brokers.size();
        if (size < i2) {
            log.warn("Not enough brokers found. The replication factor is reduced from " + i2 + " to " + size);
            i2 = size;
        }
        int size2 = map.size();
        HashMap hashMap = new HashMap(map);
        for (int i3 = 0; i3 < i; i3++) {
            ArrayList arrayList = new ArrayList();
            for (int i4 = 0; i4 < i2; i4++) {
                arrayList.add(brokers.get((i3 + ((i4 * size) / i2)) % size));
            }
            hashMap.put(Integer.valueOf(i3 + size2), arrayList);
        }
        try {
            HashMap hashMap2 = new HashMap();
            hashMap2.put("version", 1);
            hashMap2.put("partitions", hashMap);
            this.zkClient.writeData("/brokers/topics/" + str, new ObjectMapper().writeValueAsString(hashMap2));
        } catch (JsonProcessingException e) {
            throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + str, e);
        }
    }
}
