/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.KTableAggregate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableReduce;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;

public class KGroupedTableImpl<K, V>
extends AbstractStream<K>
implements KGroupedTable<K, V> {
    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    protected final Serde<K> keySerde;
    protected final Serde<V> valSerde;

    public KGroupedTableImpl(KStreamBuilder topology, String name, String sourceName, Serde<K> keySerde, Serde<V> valSerde) {
        super(topology, name, Collections.singleton(sourceName));
        this.keySerde = keySerde;
        this.valSerde = valSerde;
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, Aggregator<K, V, T> subtractor, Serde<T> aggValueSerde, String name) {
        KTableAggregate<K, V, T> aggregateSupplier = new KTableAggregate<K, V, T>(name, initializer, adder, subtractor);
        return this.doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, name);
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, Aggregator<K, V, T> substractor, String name) {
        return this.aggregate(initializer, adder, substractor, null, name);
    }

    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, Serde<T> aggValueSerde, String functionName, String name) {
        String sinkName = this.topology.newName("KSTREAM-SINK-");
        String sourceName = this.topology.newName("KSTREAM-SOURCE-");
        String funcName = this.topology.newName(functionName);
        String topic = name + REPARTITION_TOPIC_SUFFIX;
        Serializer keySerializer = this.keySerde == null ? null : this.keySerde.serializer();
        Deserializer keyDeserializer = this.keySerde == null ? null : this.keySerde.deserializer();
        Serializer valueSerializer = this.valSerde == null ? null : this.valSerde.serializer();
        Deserializer valueDeserializer = this.valSerde == null ? null : this.valSerde.deserializer();
        ChangedSerializer changedValueSerializer = new ChangedSerializer(valueSerializer);
        ChangedDeserializer changedValueDeserializer = new ChangedDeserializer(valueDeserializer);
        StateStoreSupplier aggregateStore = Stores.create(name).withKeys(this.keySerde).withValues(aggValueSerde).persistent().build();
        this.topology.addInternalTopic(topic);
        this.topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
        this.topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
        this.topology.addProcessor(funcName, aggregateSupplier, sourceName);
        this.topology.addStateStore(aggregateStore, funcName);
        return new KTableImpl(this.topology, funcName, aggregateSupplier, Collections.singleton(sourceName));
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, String name) {
        KTableReduce aggregateSupplier = new KTableReduce(name, adder, subtractor);
        return this.doAggregate(aggregateSupplier, this.valSerde, REDUCE_NAME, name);
    }

    @Override
    public KTable<K, Long> count(String name) {
        return this.aggregate(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate + 1L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate - 1L;
            }
        }, Serdes.Long(), name);
    }
}

