package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.class */
public class InternalStreamsBuilder implements InternalNameProvider {
    final InternalTopologyBuilder internalTopologyBuilder;
    private final AtomicInteger index = new AtomicInteger(0);

    public InternalStreamsBuilder(InternalTopologyBuilder internalTopologyBuilder) {
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    public <K, V> KStream<K, V> stream(Collection<String> collection, ConsumedInternal<K, V> consumedInternal) {
        String newProcessorName = newProcessorName("KSTREAM-SOURCE-");
        this.internalTopologyBuilder.addSource(consumedInternal.offsetResetPolicy(), newProcessorName, consumedInternal.timestampExtractor(), consumedInternal.keyDeserializer(), consumedInternal.valueDeserializer(), (String[]) collection.toArray(new String[collection.size()]));
        return new KStreamImpl(this, newProcessorName, Collections.singleton(newProcessorName), false);
    }

    public <K, V> KStream<K, V> stream(Pattern pattern, ConsumedInternal<K, V> consumedInternal) {
        String newProcessorName = newProcessorName("KSTREAM-SOURCE-");
        this.internalTopologyBuilder.addSource(consumedInternal.offsetResetPolicy(), newProcessorName, consumedInternal.timestampExtractor(), consumedInternal.keyDeserializer(), consumedInternal.valueDeserializer(), pattern);
        return new KStreamImpl(this, newProcessorName, Collections.singleton(newProcessorName), false);
    }

    public <K, V> KTable<K, V> table(String str, ConsumedInternal<K, V> consumedInternal, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        StoreBuilder<KeyValueStore<K, V>> materialize = new KeyValueStoreMaterializer(materializedInternal).materialize();
        String newProcessorName = newProcessorName("KSTREAM-SOURCE-");
        String newProcessorName2 = newProcessorName("KTABLE-SOURCE-");
        KTable<K, V> createKTable = createKTable(consumedInternal, str, materialize.name(), materializedInternal.isQueryable(), newProcessorName, newProcessorName2);
        this.internalTopologyBuilder.addStateStore(materialize, newProcessorName2);
        this.internalTopologyBuilder.markSourceStoreAndTopic(materialize, str);
        return createKTable;
    }

    private <K, V> KTable<K, V> createKTable(ConsumedInternal<K, V> consumedInternal, String str, String str2, boolean z, String str3, String str4) {
        KTableSource kTableSource = new KTableSource(str2);
        this.internalTopologyBuilder.addSource(consumedInternal.offsetResetPolicy(), str3, consumedInternal.timestampExtractor(), consumedInternal.keyDeserializer(), consumedInternal.valueDeserializer(), str);
        this.internalTopologyBuilder.addProcessor(str4, kTableSource, str3);
        return new KTableImpl(this, str4, kTableSource, consumedInternal.keySerde(), consumedInternal.valueSerde(), Collections.singleton(str3), str2, z);
    }

    public <K, V> GlobalKTable<K, V> globalTable(String str, ConsumedInternal<K, V> consumedInternal, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        Objects.requireNonNull(consumedInternal, "consumed can't be null");
        Objects.requireNonNull(materializedInternal, "materialized can't be null");
        materializedInternal.withLoggingDisabled();
        StoreBuilder<KeyValueStore<K, V>> materialize = new KeyValueStoreMaterializer(materializedInternal).materialize();
        this.internalTopologyBuilder.addGlobalStore(materialize, newProcessorName("KSTREAM-SOURCE-"), consumedInternal.timestampExtractor(), consumedInternal.keyDeserializer(), consumedInternal.valueDeserializer(), str, newProcessorName("KTABLE-SOURCE-"), new KTableSource(materialize.name()));
        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier(materialize.name()), materializedInternal.isQueryable());
    }

    @Override // org.apache.kafka.streams.kstream.internals.InternalNameProvider
    public String newProcessorName(String str) {
        return str + String.format("%010d", Integer.valueOf(this.index.getAndIncrement()));
    }

    @Override // org.apache.kafka.streams.kstream.internals.InternalNameProvider
    public String newStoreName(String str) {
        return str + String.format("STATE-STORE-%010d", Integer.valueOf(this.index.getAndIncrement()));
    }

    public synchronized void addStateStore(StoreBuilder storeBuilder) {
        this.internalTopologyBuilder.addStateStore(storeBuilder, new String[0]);
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String str, String str2, ConsumedInternal consumedInternal, String str3, ProcessorSupplier processorSupplier) {
        storeBuilder.withLoggingDisabled();
        this.internalTopologyBuilder.addGlobalStore(storeBuilder, str, consumedInternal.timestampExtractor(), consumedInternal.keyDeserializer(), consumedInternal.valueDeserializer(), str2, str3, processorSupplier);
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String str, ConsumedInternal consumedInternal, ProcessorSupplier processorSupplier) {
        storeBuilder.withLoggingDisabled();
        addGlobalStore(storeBuilder, newProcessorName("KSTREAM-SOURCE-"), str, consumedInternal, newProcessorName("KTABLE-SOURCE-"), processorSupplier);
    }
}
