/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.metadata.ConfluentKeyValueStreamChangeRecord;
import org.apache.kafka.common.metadata.ConfluentKeyValueStreamEntryRecord;
import org.apache.kafka.common.metadata.ConfluentKeyValueStreamTopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.metadata.InternalTopicType;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;

public class ConfluentKeyValueStreamsControlManager {
    private final Logger log;
    private final SnapshotRegistry snapshotRegistry;
    private final TimelineHashMap<Short, TimelineHashMap<Short, Long>> perTopicOffsets;
    private final CompletableFuture<Map<Short, Map<Short, Long>>> offsetInitialLoadFutureMap;

    public ConfluentKeyValueStreamsControlManager(LogContext parentLogContext, SnapshotRegistry snapshotRegistry, int nodeId) {
        LogContext lgCtx = new LogContext(parentLogContext.logPrefix() + "[ConfluentKeyValueStreams id=" + nodeId + "]");
        this.log = lgCtx.logger(ConfluentKeyValueStreamsControlManager.class);
        this.snapshotRegistry = snapshotRegistry;
        this.perTopicOffsets = new TimelineHashMap(snapshotRegistry, 0);
        this.offsetInitialLoadFutureMap = new CompletableFuture();
    }

    public void maybeConfigInitialOffsets() {
        if (!this.offsetInitialLoadFutureMap.isDone()) {
            this.log.info("finish configuring initial offsets, current offsets: " + String.valueOf(this.getCurrentOffsets()));
            this.offsetInitialLoadFutureMap.complete(this.getCurrentOffsets());
        } else {
            this.log.info("already configured initial offsets, current offsets: " + String.valueOf(this.getCurrentOffsets()));
        }
    }

    ControllerResult<Void> updateConfluentKeyValueStreamChangeRecord(InternalTopicType recordType, String key, String value, short partitionId, long offset) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ConfluentKeyValueStreamChangeRecord record = new ConfluentKeyValueStreamChangeRecord().setType(recordType.value()).setPartition(partitionId).setOffset(offset).setEntryKey(key).setEntryValue(value);
        records.add(new ApiMessageAndVersion((ApiMessage)record, 0));
        return new ControllerResult<Object>(records, null, false);
    }

    public void replay(ConfluentKeyValueStreamChangeRecord record) {
        TimelineHashMap offsets = this.perTopicOffsets.containsKey((Object)record.type()) ? (TimelineHashMap)this.perTopicOffsets.get((Object)record.type()) : new TimelineHashMap(this.snapshotRegistry, 0);
        offsets.put((Object)record.partition(), (Object)record.offset());
        this.perTopicOffsets.put((Object)record.type(), (Object)offsets);
        this.log.info("Replayed ConfluentKeyValueStreamChangeRecord. TopicType:" + String.valueOf((Object)InternalTopicType.from(record.type())) + " Partition: " + record.partition() + " Offset: " + record.offset());
    }

    public void replay(ConfluentKeyValueStreamTopicRecord record) {
        TimelineHashMap offsets = this.perTopicOffsets.containsKey((Object)record.type()) ? (TimelineHashMap)this.perTopicOffsets.get((Object)record.type()) : new TimelineHashMap(this.snapshotRegistry, 0);
        for (ConfluentKeyValueStreamTopicRecord.Partition p : record.partitions()) {
            offsets.put((Object)p.id(), (Object)p.offset());
        }
        this.perTopicOffsets.put((Object)record.type(), (Object)offsets);
        this.log.info("Replayed ConfluentKeyValueStreamChangeRecord. TopicType:" + String.valueOf((Object)InternalTopicType.from(record.type())) + " Partitions: " + String.valueOf(record.partitions()));
    }

    public void replay(ConfluentKeyValueStreamEntryRecord record) {
        this.log.info("Replayed ConfluentKeyValueStreamEntryRecord");
    }

    public boolean isInitialOffsetLoadFinished() {
        return this.offsetInitialLoadFutureMap.isDone();
    }

    public CompletableFuture<Map<Short, Map<Short, Long>>> getInitialOffsets() {
        return this.offsetInitialLoadFutureMap;
    }

    public Map<Short, Map<Short, Long>> getCurrentOffsets() {
        HashMap<Short, Map<Short, Long>> results = new HashMap<Short, Map<Short, Long>>();
        for (Map.Entry topicEntry : this.perTopicOffsets.entrySet()) {
            HashMap offsets = new HashMap((Map)topicEntry.getValue());
            results.put((Short)topicEntry.getKey(), offsets);
        }
        return results;
    }
}

