/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
public class KafkaStreams {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
    private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.streams";
    private static final int CREATED = 0;
    private static final int RUNNING = 1;
    private static final int STOPPED = 2;
    private int state = 0;
    private final StreamThread[] threads;
    private final Metrics metrics;
    private final QueryableStoreProvider queryableStoreProvider;
    private final UUID processId;
    private final StreamsMetadataState streamsMetadataState;
    private final StreamsConfig config;

    public KafkaStreams(TopologyBuilder builder, Properties props) {
        this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
        this(builder, config, new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier) {
        SystemTime time = new SystemTime();
        this.processId = UUID.randomUUID();
        this.config = config;
        String applicationId = config.getString("application.id");
        builder.setApplicationId(applicationId);
        String clientId = config.getString("client.id");
        if (clientId.length() <= 0) {
            clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
        }
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        this.metrics = new Metrics(metricConfig, reporters, (Time)time);
        this.threads = new StreamThread[config.getInt("num.stream.threads").intValue()];
        ArrayList<StateStoreProvider> storeProviders = new ArrayList<StateStoreProvider>();
        this.streamsMetadataState = new StreamsMetadataState(builder);
        for (int i = 0; i < this.threads.length; ++i) {
            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, (Time)time, this.streamsMetadataState);
            storeProviders.add(new StreamThreadStateStoreProvider(this.threads[i]));
        }
        this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
    }

    public synchronized void start() {
        log.debug("Starting Kafka Stream process");
        if (this.state == 0) {
            for (StreamThread thread : this.threads) {
                thread.start();
            }
        } else {
            if (this.state == 1) {
                throw new IllegalStateException("This process was already started.");
            }
            throw new IllegalStateException("Cannot restart after closing.");
        }
        this.state = 1;
        log.info("Started Kafka Stream process");
    }

    public synchronized void close() {
        log.debug("Stopping Kafka Stream process");
        if (this.state == 1) {
            for (StreamThread thread : this.threads) {
                thread.close();
            }
            for (StreamThread thread : this.threads) {
                try {
                    thread.join();
                }
                catch (InterruptedException ex) {
                    Thread.interrupted();
                }
            }
        }
        if (this.state != 2) {
            this.metrics.close();
            this.state = 2;
            log.info("Stopped Kafka Stream process");
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("KafkaStreams processID:" + this.processId + "\n");
        for (StreamThread thread : this.threads) {
            sb.append("\t").append(thread.toString());
        }
        sb.append("\n");
        return sb.toString();
    }

    public void cleanUp() {
        if (this.state == 1) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        String appId = this.config.getString("application.id");
        String stateDir = this.config.getString("state.dir");
        String localApplicationDir = stateDir + File.separator + appId;
        log.debug("Removing local Kafka Streams application data in {} for application {}", (Object)localApplicationDir, (Object)appId);
        StateDirectory stateDirectory = new StateDirectory(appId, stateDir);
        stateDirectory.cleanRemovedTasks();
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) {
        for (StreamThread thread : this.threads) {
            thread.setUncaughtExceptionHandler(eh);
        }
    }

    public Collection<StreamsMetadata> allMetadata() {
        this.validateIsRunning();
        return this.streamsMetadataState.getAllMetadata();
    }

    public Collection<StreamsMetadata> allMetadataForStore(String storeName) {
        this.validateIsRunning();
        return this.streamsMetadataState.getAllMetadataForStore(storeName);
    }

    public <K> StreamsMetadata metadataForKey(String storeName, K key, Serializer<K> keySerializer) {
        this.validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
    }

    public <K> StreamsMetadata metadataForKey(String storeName, K key, StreamPartitioner<K, ?> partitioner) {
        this.validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
    }

    public <T> T store(String storeName, QueryableStoreType<T> queryableStoreType) {
        this.validateIsRunning();
        return this.queryableStoreProvider.getStore(storeName, queryableStoreType);
    }

    private void validateIsRunning() {
        if (this.state != 1) {
            throw new IllegalStateException("KafkaStreams is not running");
        }
    }
}

