/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import io.confluent.kafka.link.ClusterLinkUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AlterMirrorTopicsRequestData;
import org.apache.kafka.common.message.AlterMirrorTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.MirrorTopicChangeRecord;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

public class MirrorTopicControlManager {
    private final SnapshotRegistry snapshotRegistry;
    private final Logger log;
    private final Time time;
    private final Function<String, Optional<Uuid>> clusterLinkResolver;
    private final Function<String, Optional<Uuid>> topicIdResolver;
    private final TimelineHashMap<Uuid, TimelineHashSet<Uuid>> linksToMirrorTopics;
    private final TimelineHashMap<Uuid, MirrorTopic> mirrorTopics;

    public MirrorTopicControlManager(SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, Function<String, Optional<Uuid>> topicIdResolver, Function<String, Optional<Uuid>> clusterLinkResolver) {
        this.snapshotRegistry = snapshotRegistry;
        this.time = time;
        this.topicIdResolver = topicIdResolver;
        this.clusterLinkResolver = clusterLinkResolver;
        this.log = logContext.logger(MirrorTopicControlManager.class);
        this.linksToMirrorTopics = new TimelineHashMap(snapshotRegistry, 0);
        this.mirrorTopics = new TimelineHashMap(snapshotRegistry, 0);
    }

    boolean isMirrorTopic(Uuid topicId) {
        return this.mirrorTopics.containsKey(topicId);
    }

    Optional<MirrorTopic> mirrorTopic(Uuid topicId) {
        return Optional.ofNullable(this.mirrorTopics.get(topicId));
    }

    boolean isMirrorTopic(String topicName) {
        Optional<Uuid> topicId = this.topicIdResolver.apply(topicName);
        return topicId.filter(this::isMirrorTopic).isPresent();
    }

    Optional<Uuid> clusterLinkIdForTopicId(Uuid topicId) {
        return Optional.ofNullable(this.mirrorTopics.get(topicId)).map(MirrorTopic::linkId);
    }

    Set<Uuid> topicIdsForClusterLinkId(Uuid linkId, boolean excludeStopped) {
        Set topicIds = this.linksToMirrorTopics.get(linkId);
        if (topicIds == null) {
            return Collections.emptySet();
        }
        if (excludeStopped) {
            return topicIds.stream().filter(topicId -> !this.mirrorTopics.get(topicId).mirrorState().equals((Object)MirrorTopic.State.STOPPED)).collect(Collectors.toSet());
        }
        return topicIds;
    }

    Set<Uuid> topicIdsInUse(Uuid linkId) {
        Set topicIds = this.linksToMirrorTopics.get(linkId);
        if (topicIds == null) {
            return Collections.emptySet();
        }
        return topicIds.stream().filter(topicId -> {
            MirrorTopic.State state = this.mirrorTopics.get(topicId).mirrorState();
            return state != MirrorTopic.State.STOPPED && state != MirrorTopic.State.PENDING_STOPPED;
        }).collect(Collectors.toSet());
    }

    ControllerResult<AlterMirrorTopicsResponseData> alterMirrorTopics(AlterMirrorTopicsRequestData request) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(request.alterMirrorTopics().size());
        AlterMirrorTopicsResponseData response = new AlterMirrorTopicsResponseData();
        response.setAlterMirrorResults(new ArrayList());
        for (AlterMirrorTopicsRequestData.AlterMirrorTopic alterMirrorTopic : request.alterMirrorTopics()) {
            ApiError error = this.alterMirrorState(alterMirrorTopic, records::add);
            AlterMirrorTopicsResponseData.AlterMirrorResult result = new AlterMirrorTopicsResponseData.AlterMirrorResult();
            result.setTopic(result.topic());
            result.setErrorCode(error.error().code());
            result.setErrorMessage(error.message());
            response.alterMirrorResults().add(result);
        }
        response.setErrorCode(Errors.NONE.code());
        if (request.validateOnly()) {
            return ControllerResult.of(Collections.emptyList(), response);
        }
        return ControllerResult.of(records, response);
    }

    ApiError alterMirrorState(AlterMirrorTopicsRequestData.AlterMirrorTopic request, Consumer<ApiMessageAndVersion> recordConsumer) {
        MirrorTopic newMirrorTopic;
        MirrorTopic.State newState;
        Optional<Uuid> topicId = this.topicIdResolver.apply(request.topic());
        if (!topicId.isPresent()) {
            return new ApiError(Errors.UNKNOWN_TOPIC_ID, "No such topic '" + request.topic() + "'.");
        }
        MirrorTopic mirrorTopic = this.mirrorTopics.get(topicId.get());
        if (mirrorTopic == null) {
            return new ApiError(Errors.INVALID_REQUEST, "Topic '" + request.topic() + "' is not a mirror topic.");
        }
        try {
            newState = MirrorTopic.State.fromStateName(request.mirrorTopicState());
        }
        catch (IllegalArgumentException e) {
            return new ApiError(Errors.INVALID_REQUEST, "Unknown mirror topic state " + request.mirrorTopicState() + " for mirror topic " + mirrorTopic.topicId());
        }
        if (!this.isAllowedStateChange(mirrorTopic.mirrorState(), newState)) {
            return new ApiError(Errors.INVALID_REQUEST, "Illegal state transition " + (Object)((Object)mirrorTopic.mirrorState()) + " to " + (Object)((Object)newState) + " for mirror topic " + mirrorTopic.topicId());
        }
        long timeMs = this.time.milliseconds();
        switch (newState) {
            case MIRROR: {
                newMirrorTopic = MirrorTopic.mirror(mirrorTopic, timeMs);
                break;
            }
            case PAUSED: {
                newMirrorTopic = MirrorTopic.paused(mirrorTopic, timeMs, request.topicLevelPause(), request.linkLevelPause(), mirrorTopic.mirrorState());
                break;
            }
            case FAILED: {
                newMirrorTopic = MirrorTopic.failed(mirrorTopic, timeMs);
                break;
            }
            case PENDING_STOPPED: {
                newMirrorTopic = MirrorTopic.pendingStopped(mirrorTopic, timeMs, request.promoted());
                break;
            }
            case STOPPED: {
                newMirrorTopic = MirrorTopic.stopped(mirrorTopic, timeMs, request.stoppedLogEndOffsets());
                break;
            }
            default: {
                return new ApiError(Errors.INVALID_REQUEST, "Cannot transition mirror topic " + mirrorTopic.topicId() + " from " + (Object)((Object)mirrorTopic.mirrorState()) + " to unknown state " + request.mirrorTopicState());
            }
        }
        recordConsumer.accept(new ApiMessageAndVersion((ApiMessage)MirrorTopic.toChangeRecord(newMirrorTopic), MetadataRecordType.MIRROR_TOPIC_CHANGE_RECORD.highestSupportedVersion()));
        return ApiError.NONE;
    }

    private boolean isAllowedStateChange(MirrorTopic.State currentState, MirrorTopic.State proposedState) {
        if (currentState == proposedState) {
            return true;
        }
        switch (currentState) {
            case MIRROR: {
                return proposedState == MirrorTopic.State.PAUSED || proposedState == MirrorTopic.State.PENDING_STOPPED || proposedState == MirrorTopic.State.FAILED;
            }
            case PAUSED: {
                return proposedState == MirrorTopic.State.MIRROR || proposedState == MirrorTopic.State.PENDING_STOPPED || proposedState == MirrorTopic.State.FAILED;
            }
            case FAILED: {
                return proposedState == MirrorTopic.State.PAUSED || proposedState == MirrorTopic.State.PENDING_STOPPED;
            }
            case PENDING_STOPPED: {
                return proposedState == MirrorTopic.State.STOPPED;
            }
            case STOPPED: {
                return false;
            }
        }
        this.log.error("Unhandled current mirror topic state '" + (Object)((Object)currentState) + "'");
        return false;
    }

    ApiError maybeAddMirrorTopicRecord(CreateTopicsRequestData.CreatableTopic topic, Uuid topicId, Consumer<ApiMessageAndVersion> recordConsumer) {
        if (topic.linkName() == null && topic.mirrorTopic() == null) {
            return ApiError.NONE;
        }
        if (topic.linkName() == null || topic.mirrorTopic() == null) {
            return new ApiError(Errors.INVALID_REQUEST, "Link name and mirror topic name must be provided together");
        }
        try {
            ClusterLinkUtils.validateLinkNameOrThrow((String)topic.linkName());
        }
        catch (Throwable t) {
            return ApiError.fromThrowable((Throwable)t);
        }
        Optional<Uuid> linkId = this.clusterLinkResolver.apply(topic.linkName());
        if (!linkId.isPresent()) {
            return new ApiError(Errors.CLUSTER_LINK_NOT_FOUND, "Cluster link " + topic.linkName() + " was not found.");
        }
        recordConsumer.accept(new ApiMessageAndVersion((ApiMessage)new MirrorTopicRecord().setMirrorTopicState(MirrorTopic.State.MIRROR.stateName()).setTopicName(topic.name()).setTopicId(topicId).setClusterLinkId(linkId.get()).setClusterLinkName(topic.linkName()).setSourceTopicId(topic.sourceTopicId()).setSourceTopicName(topic.mirrorTopic()), MetadataRecordType.MIRROR_TOPIC_RECORD.highestSupportedVersion()));
        return ApiError.NONE;
    }

    void failMirrorTopic(Uuid mirrorTopicId, Consumer<ApiMessageAndVersion> recordConsumer) {
        MirrorTopic mirrorTopic = this.mirrorTopics.get(mirrorTopicId);
        if (mirrorTopic == null) {
            this.log.error("Could not fail mirror topic {} since it does not exist", (Object)mirrorTopicId);
        } else if (mirrorTopic.mirrorState() == MirrorTopic.State.MIRROR) {
            MirrorTopic failed = MirrorTopic.failed(mirrorTopic, Time.SYSTEM.milliseconds());
            recordConsumer.accept(new ApiMessageAndVersion((ApiMessage)MirrorTopic.toChangeRecord(failed), MetadataRecordType.MIRROR_TOPIC_CHANGE_RECORD.highestSupportedVersion()));
        } else {
            this.log.info("Not failing mirror topic {} since it is not currently active", (Object)mirrorTopicId);
        }
    }

    void unLinkMirrorTopics(Uuid deletedLinkId, String deletedLinkName) {
        Set mirrorTopicIds = this.linksToMirrorTopics.remove(deletedLinkId);
        if (mirrorTopicIds != null) {
            this.log.info("Removing mirror topic metadata for {} topics due to deleted cluster link {} with ID {}.", new Object[]{mirrorTopicIds.size(), deletedLinkName, deletedLinkId});
            mirrorTopicIds.forEach(this.mirrorTopics::remove);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Removed mirror topic metadata for topic IDs {}.", (Object)mirrorTopicIds);
            }
        }
    }

    void deleteMirrorTopic(Uuid topicIdToRemove, String topicName) {
        MirrorTopic mirrorTopic = this.mirrorTopics.remove(topicIdToRemove);
        if (mirrorTopic != null) {
            this.log.info("Removing mirror state from topic {} with ID {}", (Object)topicName, (Object)topicIdToRemove);
            Set mirrorTopicsUsingLink = this.linksToMirrorTopics.get(mirrorTopic.linkId());
            if (mirrorTopicsUsingLink != null) {
                mirrorTopicsUsingLink.remove(topicIdToRemove);
            }
        }
    }

    public void replay(MirrorTopicRecord record) {
        this.linksToMirrorTopics.computeIfAbsent(record.clusterLinkId(), __ -> new TimelineHashSet(this.snapshotRegistry, 0)).add(record.topicId());
        this.mirrorTopics.put(record.topicId(), MirrorTopic.fromRecord(record));
        this.log.info("Created mirror topic {} with topic ID {}, link name {}, and link ID {}.", new Object[]{record.topicName(), record.topicId(), record.clusterLinkName(), record.clusterLinkId()});
    }

    public void replay(MirrorTopicChangeRecord record) {
        MirrorTopic newMirrorTopic;
        MirrorTopic.State state = MirrorTopic.State.fromStateName(record.mirrorTopicState());
        MirrorTopic currentMirrorTopic = this.mirrorTopics.get(record.topicId());
        if (currentMirrorTopic == null) {
            throw new IllegalStateException("Attempting to update unknown mirror topic " + record.topicId());
        }
        switch (state) {
            case MIRROR: {
                newMirrorTopic = MirrorTopic.mirror(currentMirrorTopic, record.timeMs());
                break;
            }
            case PAUSED: {
                newMirrorTopic = MirrorTopic.paused(currentMirrorTopic, record.timeMs(), record.topicLevelPause(), record.linkLevelPause(), MirrorTopic.State.fromStateName(record.previousToPausedState()));
                break;
            }
            case FAILED: {
                newMirrorTopic = MirrorTopic.failed(currentMirrorTopic, record.timeMs());
                break;
            }
            case PENDING_STOPPED: {
                newMirrorTopic = MirrorTopic.pendingStopped(currentMirrorTopic, record.timeMs(), record.promoted());
                break;
            }
            case STOPPED: {
                newMirrorTopic = MirrorTopic.stopped(currentMirrorTopic, record.timeMs(), record.stoppedLogEndOffsets());
                break;
            }
            default: {
                throw new RuntimeException("Cannot update mirror topic state for topic ID " + record.topicId() + ", unknown mirror state " + record.mirrorTopicState());
            }
        }
        this.log.info("Updating mirror topic {} from {} to {}", new Object[]{newMirrorTopic.topicId(), currentMirrorTopic.mirrorState(), state});
        this.mirrorTopics.put(record.topicId(), newMirrorTopic);
    }

    void snapshotRecord(Uuid mirrorTopicId, String topicName, long epoch, Consumer<ApiMessageAndVersion> recordConsumer) {
        MirrorTopic mirrorTopic = this.mirrorTopics.get(mirrorTopicId, epoch);
        if (mirrorTopic != null) {
            recordConsumer.accept(new ApiMessageAndVersion((ApiMessage)MirrorTopic.toSnapshotRecord(mirrorTopic, topicName), MetadataRecordType.MIRROR_TOPIC_RECORD.highestSupportedVersion()));
        }
    }
}

