/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;

public class ProducerIdControlManager {
    private final ClusterControlManager clusterControlManager;
    private final TimelineObject<ProducerIdsBlock> nextProducerBlock;
    private final TimelineLong brokerEpoch;

    ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
        this.clusterControlManager = clusterControlManager;
        this.nextProducerBlock = new TimelineObject<ProducerIdsBlock>(snapshotRegistry, ProducerIdsBlock.EMPTY);
        this.brokerEpoch = new TimelineLong(snapshotRegistry);
    }

    ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
        this.clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
        long firstProducerIdInBlock = this.nextProducerBlock.get().firstProducerId();
        if (firstProducerIdInBlock > 9223372036854774807L) {
            throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId has exceeded the int64 type limit");
        }
        ProducerIdsBlock block = new ProducerIdsBlock(brokerId, firstProducerIdInBlock, 1000);
        long newNextProducerId = block.nextBlockFirstId();
        ProducerIdsRecord record = new ProducerIdsRecord().setNextProducerId(newNextProducerId).setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
        return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)record, 0)), block);
    }

    void replay(ProducerIdsRecord record) {
        long currentNextProducerId = this.nextProducerBlock.get().firstProducerId();
        if (record.nextProducerId() <= currentNextProducerId) {
            throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ") is not greater than current next Producer ID (" + currentNextProducerId + ")");
        }
        this.nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(), 1000));
        this.brokerEpoch.set(record.brokerEpoch());
    }

    Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(1);
        ProducerIdsBlock producerIdBlock = this.nextProducerBlock.get(epoch);
        if (producerIdBlock.firstProducerId() > 0L) {
            records.add(new ApiMessageAndVersion((ApiMessage)new ProducerIdsRecord().setNextProducerId(producerIdBlock.firstProducerId()).setBrokerId(producerIdBlock.assignedBrokerId()).setBrokerEpoch(this.brokerEpoch.get(epoch)), 0));
        }
        return Collections.singleton(records).iterator();
    }
}

