/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.chaining;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupCombineChainedDriver<IN, OUT>
extends ChainedDriver<IN, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupCombineChainedDriver.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private InMemorySorter<IN> sorter;
    private GroupReduceFunction<IN, OUT> reducer;
    private TypeSerializer<IN> serializer;
    private TypeComparator<IN> groupingComparator;
    private AbstractInvokable parent;
    private QuickSort sortAlgo = new QuickSort();
    private List<MemorySegment> memory;
    private volatile boolean running = true;

    @Override
    public void setup(AbstractInvokable parent) {
        GroupReduceFunction combiner;
        this.parent = parent;
        this.reducer = combiner = BatchTask.instantiateUserCode(this.config, this.userCodeClassLoader, GroupReduceFunction.class);
        FunctionUtils.setFunctionRuntimeContext((Function)combiner, (RuntimeContext)this.getUdfRuntimeContext());
    }

    @Override
    public void openTask() throws Exception {
        Configuration stubConfig = this.config.getStubParameters();
        BatchTask.openUserCode(this.reducer, stubConfig);
        TypeSerializerFactory serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
        TypeComparatorFactory sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
        TypeComparatorFactory groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
        this.serializer = serializerFactory.getSerializer();
        TypeComparator sortingComparator = sortingComparatorFactory.createComparator();
        this.groupingComparator = groupingComparatorFactory.createComparator();
        MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
        int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
        this.memory = memManager.allocatePages(this.parent, numMemoryPages);
        this.sorter = sortingComparator.supportsSerializationWithKeyNormalization() && this.serializer.getLength() > 0 && this.serializer.getLength() <= 32 ? new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory) : new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory);
        if (LOG.isDebugEnabled()) {
            LOG.debug("SynchronousChainedCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        }
    }

    @Override
    public void closeTask() throws Exception {
        if (this.sorter != null) {
            this.sorter.dispose();
        }
        this.parent.getEnvironment().getMemoryManager().release(this.memory);
        if (this.running) {
            BatchTask.closeUserCode(this.reducer);
        }
    }

    @Override
    public void cancelTask() {
        this.running = false;
        if (this.sorter != null) {
            try {
                this.sorter.dispose();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.parent.getEnvironment().getMemoryManager().release(this.memory);
    }

    @Override
    public Function getStub() {
        return this.reducer;
    }

    @Override
    public String getTaskName() {
        return this.taskName;
    }

    @Override
    public void collect(IN record) {
        this.numRecordsIn.inc();
        try {
            if (this.sorter.write(record)) {
                return;
            }
            this.sortAndReduce();
            this.sorter.reset();
            if (!this.sorter.write(record)) {
                throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
            }
        }
        catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }

    public void close() {
        try {
            this.sortAndReduce();
        }
        catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
        this.outputCollector.close();
    }

    private void sortAndReduce() throws Exception {
        block4: {
            InMemorySorter<IN> sorter;
            block3: {
                sorter = this.sorter;
                if (!this.objectReuseEnabled) break block3;
                if (sorter.isEmpty()) break block4;
                this.sortAlgo.sort(sorter);
                ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
                GroupReduceFunction<IN, OUT> stub = this.reducer;
                Collector output = this.outputCollector;
                while (this.running && keyIter.nextKey()) {
                    stub.reduce((Iterable)keyIter.getValues(), output);
                }
                break block4;
            }
            if (!sorter.isEmpty()) {
                this.sortAlgo.sort(sorter);
                NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
                GroupReduceFunction<IN, OUT> stub = this.reducer;
                Collector output = this.outputCollector;
                while (this.running && keyIter.nextKey()) {
                    stub.reduce((Iterable)keyIter.getValues(), output);
                }
            }
        }
    }
}

