/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperation2;
import com.hazelcast.jet.aggregate.AggregateOperation3;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.WindowResultFunction;
import com.hazelcast.jet.impl.pipeline.ComputeStageImplBase;
import com.hazelcast.jet.impl.pipeline.JetEventFunctionAdapter;
import com.hazelcast.jet.impl.pipeline.StageWithKeyAndWindowImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageImpl;
import com.hazelcast.jet.impl.pipeline.transform.WindowAggregateTransform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.StageWithKeyAndWindow;
import com.hazelcast.jet.pipeline.StageWithWindow;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nonnull;

public class StageWithWindowImpl<T>
implements StageWithWindow<T> {
    private final StreamStageImpl<T> streamStage;
    private final WindowDefinition wDef;

    StageWithWindowImpl(@Nonnull StreamStageImpl<T> streamStage, @Nonnull WindowDefinition wDef) {
        this.streamStage = streamStage;
        this.wDef = wDef;
    }

    @Override
    @Nonnull
    public WindowDefinition windowDefinition() {
        return this.wDef;
    }

    @Override
    @Nonnull
    public StreamStage<T> streamStage() {
        return this.streamStage;
    }

    @Override
    @Nonnull
    public <K> StageWithKeyAndWindow<T, K> groupingKey(@Nonnull DistributedFunction<? super T, ? extends K> keyFn) {
        return new StageWithKeyAndWindowImpl<T, K>(this.streamStage, keyFn, this.wDef);
    }

    @Override
    @Nonnull
    public <R, OUT> StreamStage<OUT> aggregate(@Nonnull AggregateOperation1<? super T, ?, ? extends R> aggrOp, @Nonnull WindowResultFunction<? super R, ? extends OUT> mapToOutputFn) {
        ComputeStageImplBase.ensureJetEvents(this.streamStage, "This pipeline stage");
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attachAggregate(aggrOp, mapToOutputFn);
    }

    private <A, R, OUT> StreamStage<OUT> attachAggregate(@Nonnull AggregateOperation1<? super T, A, R> aggrOp, @Nonnull WindowResultFunction<? super R, ? extends OUT> mapToOutputFn) {
        JetEventFunctionAdapter fnAdapter = ComputeStageImplBase.ADAPT_TO_JET_EVENT;
        return (StreamStage)this.streamStage.attach(new WindowAggregateTransform(Collections.singletonList(this.streamStage.transform), this.wDef, fnAdapter.adaptAggregateOperation1(aggrOp), fnAdapter.adaptWindowResultFn(mapToOutputFn)), fnAdapter);
    }

    @Override
    @Nonnull
    public <T1, R, OUT> StreamStage<OUT> aggregate2(@Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation2<? super T, ? super T1, ?, R> aggrOp, @Nonnull WindowResultFunction<? super R, ? extends OUT> mapToOutputFn) {
        ComputeStageImplBase.ensureJetEvents(this.streamStage, "This pipeline stage");
        ComputeStageImplBase.ensureJetEvents((ComputeStageImplBase)((Object)stage1), "stage1");
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attachAggregate2(stage1, aggrOp, mapToOutputFn);
    }

    private <T1, A, R, OUT> StreamStage<OUT> attachAggregate2(@Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation2<? super T, ? super T1, A, R> aggrOp, @Nonnull WindowResultFunction<? super R, ? extends OUT> mapToOutputFn) {
        JetEventFunctionAdapter fnAdapter = ComputeStageImplBase.ADAPT_TO_JET_EVENT;
        return (StreamStage)this.streamStage.attach(new WindowAggregateTransform(Arrays.asList(this.streamStage.transform, ((StreamStageImpl)stage1).transform), this.wDef, JetEventFunctionAdapter.adaptAggregateOperation2(aggrOp), fnAdapter.adaptWindowResultFn(mapToOutputFn)), fnAdapter);
    }

    @Override
    @Nonnull
    public <T1, T2, R, OUT> StreamStage<OUT> aggregate3(@Nonnull StreamStage<T1> stage1, @Nonnull StreamStage<T2> stage2, @Nonnull AggregateOperation3<? super T, ? super T1, ? super T2, ?, R> aggrOp, @Nonnull WindowResultFunction<? super R, ? extends OUT> mapToOutputFn) {
        ComputeStageImplBase stageImpl1 = (ComputeStageImplBase)((Object)stage1);
        ComputeStageImplBase stageImpl2 = (ComputeStageImplBase)((Object)stage2);
        ComputeStageImplBase.ensureJetEvents(this.streamStage, "This pipeline stage");
        ComputeStageImplBase.ensureJetEvents(stageImpl1, "stage1");
        ComputeStageImplBase.ensureJetEvents(stageImpl2, "stage2");
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attachAggregate3(stage1, stage2, aggrOp, mapToOutputFn);
    }

    private <T1, T2, A, R, OUT> StreamStage<OUT> attachAggregate3(@Nonnull StreamStage<T1> stage1, @Nonnull StreamStage<T2> stage2, @Nonnull AggregateOperation3<? super T, ? super T1, ? super T2, A, R> aggrOp, @Nonnull WindowResultFunction<? super R, ? extends OUT> mapToOutputFn) {
        JetEventFunctionAdapter fnAdapter = ComputeStageImplBase.ADAPT_TO_JET_EVENT;
        return (StreamStage)this.streamStage.attach(new WindowAggregateTransform(Arrays.asList(this.streamStage.transform, ((StreamStageImpl)stage1).transform, ((StreamStageImpl)stage2).transform), this.wDef, JetEventFunctionAdapter.adaptAggregateOperation3(aggrOp), fnAdapter.adaptWindowResultFn(mapToOutputFn)), fnAdapter);
    }
}

