package org.apache.kafka.streams.processor.internals.namedtopology;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.errors.UnknownTopologyException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.slf4j.Logger;

@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.class */
public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
    private final Logger log;

    public KafkaStreamsNamedTopologyWrapper(Properties properties) {
        this(new StreamsConfig(properties), new DefaultKafkaClientSupplier());
    }

    public KafkaStreamsNamedTopologyWrapper(Properties properties, KafkaClientSupplier kafkaClientSupplier) {
        this(new StreamsConfig(properties), kafkaClientSupplier);
    }

    private KafkaStreamsNamedTopologyWrapper(StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier) {
        super(new TopologyMetadata(new ConcurrentSkipListMap(), streamsConfig), streamsConfig, kafkaClientSupplier);
        this.log = new LogContext(String.format("stream-client [%s] ", this.clientId)).logger(getClass());
    }

    public void start(NamedTopology namedTopology) {
        start(Collections.singleton(namedTopology));
    }

    public synchronized void start(Collection<NamedTopology> collection) {
        this.log.info("Starting Streams with topologies: {}", collection);
        for (NamedTopology namedTopology : collection) {
            AddNamedTopologyResult addNamedTopology = addNamedTopology(namedTopology);
            if (addNamedTopology.all().isCompletedExceptionally()) {
                KafkaException exceptionNow = addNamedTopology.exceptionNow();
                this.log.error("Failed to start Streams when adding topology " + namedTopology.name() + " due to", exceptionNow);
                throw exceptionNow;
            }
        }
        super.start();
    }

    public NamedTopologyBuilder newNamedTopologyBuilder(String str, Properties properties) {
        if (str.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) {
            throw new IllegalArgumentException("The character sequence '__' is not allowed in a NamedTopology, please select a new name");
        }
        return new NamedTopologyBuilder(str, this.applicationConfigs, properties);
    }

    public NamedTopologyBuilder newNamedTopologyBuilder(String str) {
        return newNamedTopologyBuilder(str, new Properties());
    }

    public synchronized Optional<NamedTopology> getTopologyByName(String str) {
        return Optional.ofNullable(this.topologyMetadata.lookupBuilderForNamedTopology(str)).map((v0) -> {
            return v0.namedTopology();
        });
    }

    public Collection<NamedTopology> getAllTopologies() {
        return this.topologyMetadata.getAllNamedTopologies();
    }

    public AddNamedTopologyResult addNamedTopology(NamedTopology namedTopology) {
        this.log.info("Adding new NamedTopology: {}", namedTopology.name());
        KafkaFutureImpl<Void> kafkaFutureImpl = new KafkaFutureImpl<>();
        if (hasStartedOrFinishedShuttingDown()) {
            kafkaFutureImpl.completeExceptionally(new IllegalStateException("Cannot add a NamedTopology while the state is " + this.state));
        } else if (getTopologyByName(namedTopology.name()).isPresent()) {
            kafkaFutureImpl.completeExceptionally(new IllegalArgumentException("Unable to add the new NamedTopology " + namedTopology.name() + " as another of the same name already exists"));
        } else {
            this.topologyMetadata.registerAndBuildNewTopology(kafkaFutureImpl, namedTopology.internalTopologyBuilder());
            maybeCompleteFutureIfStillInCREATED(kafkaFutureImpl, "adding topology " + namedTopology.name());
        }
        return new AddNamedTopologyResult(kafkaFutureImpl);
    }

    public RemoveNamedTopologyResult removeNamedTopology(String str, boolean z) {
        this.log.info("Informed to remove topology {} with resetOffsets={} ", str, Boolean.valueOf(z));
        KafkaFutureImpl<Void> kafkaFutureImpl = new KafkaFutureImpl<>();
        if (hasStartedOrFinishedShuttingDown()) {
            this.log.error("Attempted to remove topology {} from while the Kafka Streams was in state {}, topologies cannot be modified if the application has begun or completed shutting down.", str, this.state);
            kafkaFutureImpl.completeExceptionally(new IllegalStateException("Cannot remove a NamedTopology while the state is " + this.state));
        } else if (!getTopologyByName(str).isPresent()) {
            this.log.error("Attempted to remove unknown topology {}. This application currently contains thefollowing topologies: {}.", str, this.topologyMetadata.namedTopologiesView());
            kafkaFutureImpl.completeExceptionally(new UnknownTopologyException("Unable to remove topology", str));
        }
        Set set = (Set) metadataForLocalThreads().stream().flatMap(threadMetadata -> {
            return new HashSet(threadMetadata.activeTasks()).stream();
        }).flatMap(taskMetadata -> {
            return taskMetadata.topicPartitions().stream();
        }).filter(topicPartition -> {
            return this.topologyMetadata.sourceTopicsForTopology(str).contains(topicPartition.topic());
        }).collect(Collectors.toSet());
        this.topologyMetadata.unregisterTopology(kafkaFutureImpl, str);
        boolean maybeCompleteFutureIfStillInCREATED = maybeCompleteFutureIfStillInCREATED(kafkaFutureImpl, "removing topology " + str);
        if (!z || maybeCompleteFutureIfStillInCREATED || set.isEmpty()) {
            return new RemoveNamedTopologyResult(kafkaFutureImpl);
        }
        Logger logger = this.log;
        Object[] objArr = new Object[3];
        objArr[0] = kafkaFutureImpl.isCompletedExceptionally() ? "unsuccessfully" : "successfully";
        objArr[1] = str;
        objArr[2] = set;
        logger.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", objArr);
        return new RemoveNamedTopologyResult(kafkaFutureImpl, str, () -> {
            resetOffsets(set);
        });
    }

    public void pauseNamedTopology(String str) {
        this.topologyMetadata.pauseTopology(str);
    }

    public boolean isNamedTopologyPaused(String str) {
        return this.topologyMetadata.isPaused(str);
    }

    public void resumeNamedTopology(String str) {
        this.topologyMetadata.resumeTopology(str);
        this.threads.forEach((v0) -> {
            v0.signalResume();
        });
    }

    private boolean maybeCompleteFutureIfStillInCREATED(KafkaFutureImpl<Void> kafkaFutureImpl, String str) {
        if (this.state != KafkaStreams.State.CREATED || kafkaFutureImpl.isDone()) {
            return false;
        }
        kafkaFutureImpl.complete((Object) null);
        this.log.info("Completed {} since application has not been started", str);
        return true;
    }

    private void resetOffsets(Set<TopicPartition> set) throws StreamsException {
        int i = 100;
        while (true) {
            try {
                this.adminClient.deleteConsumerGroupOffsets(this.applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), set).all().get();
                this.log.info("Successfully completed resetting offsets.");
                return;
            } catch (InterruptedException e) {
                e.printStackTrace();
                this.log.error("Offset reset failed.", e);
                throw new StreamsException(e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause() != null ? e2.getCause() : e2;
                if ((cause instanceof GroupSubscribedToTopicException) && cause.getMessage().equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) {
                    this.log.debug("Offset reset failed, there may be other nodes which have not yet finished removing this topology", cause);
                } else {
                    if (cause instanceof GroupIdNotFoundException) {
                        this.log.info("The offsets have been reset by another client or the group has been deleted, no need to retry further.");
                        return;
                    }
                    i--;
                    if (i <= 0) {
                        this.log.error("Offset reset failed, no retries remaining.", cause);
                        throw new StreamsException(cause);
                    }
                    this.log.error("Offset reset failed, retries remaining: " + i, cause);
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e3) {
                    e3.printStackTrace();
                }
            }
        }
    }

    public RemoveNamedTopologyResult removeNamedTopology(String str) {
        return removeNamedTopology(str, false);
    }

    public void cleanUpNamedTopology(String str) {
        if (getTopologyByName(str).isPresent()) {
            throw new IllegalStateException("Can't clean up local state for an active NamedTopology: " + str);
        }
        this.stateDirectory.clearLocalStateForNamedTopology(str);
    }

    public String getFullTopologyDescription() {
        return this.topologyMetadata.topologyDescriptionString();
    }

    private void verifyTopologyStateStore(String str, String str2) {
        InternalTopologyBuilder lookupBuilderForNamedTopology = this.topologyMetadata.lookupBuilderForNamedTopology(str);
        if (lookupBuilderForNamedTopology == null) {
            throw new UnknownTopologyException("Cannot get state store " + str2, str);
        }
        if (!lookupBuilderForNamedTopology.hasStore(str2)) {
            throw new UnknownStateStoreException("Cannot get state store " + str2 + " from NamedTopology " + str + " because no such state store exists in this topology.");
        }
    }

    public <T> T store(NamedTopologyStoreQueryParameters<T> namedTopologyStoreQueryParameters) {
        verifyTopologyStateStore(namedTopologyStoreQueryParameters.topologyName, namedTopologyStoreQueryParameters.storeName());
        return (T) super.store((StoreQueryParameters) namedTopologyStoreQueryParameters);
    }

    public Collection<StreamsMetadata> streamsMetadataForStore(String str, String str2) {
        verifyTopologyStateStore(str2, str);
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadataForStore(str, str2);
    }

    public Collection<StreamsMetadata> allStreamsClientsMetadataForTopology(String str) {
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadataForTopology(str);
    }

    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, Serializer<K> serializer, String str2) {
        verifyTopologyStateStore(str2, str);
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getKeyQueryMetadataForKey(str, (String) k, (Serializer<String>) serializer, str2);
    }

    public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLagsForTopology(String str) {
        if (!getTopologyByName(str).isPresent()) {
            this.log.error("Can't get local store partition lags since topology {} does not exist in this application", str);
            throw new UnknownTopologyException("Can't get local store partition lags", str);
        }
        ArrayList arrayList = new ArrayList();
        processStreamThread(streamThread -> {
            arrayList.addAll((Collection) streamThread.readyOnlyAllTasks().stream().filter(task -> {
                return str.equals(task.id().topologyName());
            }).collect(Collectors.toList()));
        });
        return allLocalStorePartitionLags(arrayList);
    }
}
