/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkPolicies;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBiPredicate;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.function.DistributedTriFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.AbstractStage;
import com.hazelcast.jet.impl.pipeline.FunctionAdapter;
import com.hazelcast.jet.impl.pipeline.JetEventFunctionAdapter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.SinkImpl;
import com.hazelcast.jet.impl.pipeline.SinkStageImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.FilterTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapTransform;
import com.hazelcast.jet.impl.pipeline.transform.HashJoinTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapTransform;
import com.hazelcast.jet.impl.pipeline.transform.MergeTransform;
import com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.PeekTransform;
import com.hazelcast.jet.impl.pipeline.transform.ProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.RollingAggregateTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.ContextFactory;
import com.hazelcast.jet.pipeline.GeneralStage;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.util.Preconditions;
import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nonnull;

public abstract class ComputeStageImplBase<T>
extends AbstractStage {
    static final FunctionAdapter DO_NOT_ADAPT = new FunctionAdapter();
    static final JetEventFunctionAdapter ADAPT_TO_JET_EVENT = new JetEventFunctionAdapter();
    @Nonnull
    public FunctionAdapter fnAdapter;

    ComputeStageImplBase(@Nonnull Transform transform, @Nonnull FunctionAdapter fnAdapter, @Nonnull PipelineImpl pipelineImpl, boolean acceptsDownstream) {
        super(transform, acceptsDownstream, pipelineImpl);
        this.fnAdapter = fnAdapter;
    }

    @Nonnull
    public StreamStage<T> addTimestamps() {
        return this.addTimestamps(o -> System.currentTimeMillis(), 0L);
    }

    @Nonnull
    public StreamStage<T> addTimestamps(@Nonnull DistributedToLongFunction<? super T> timestampFn, long allowedLateness) {
        Util.checkSerializable(timestampFn, "timestampFn");
        Preconditions.checkFalse(this.hasJetEvents(), "This stage already has timestamps assigned to it.");
        DistributedSupplier<WatermarkPolicy> wmPolicy = WatermarkPolicies.limitingLag(allowedLateness);
        WatermarkGenerationParams<Object> wmParams = WatermarkGenerationParams.wmGenParams(timestampFn, JetEvent::jetEvent, wmPolicy, WatermarkEmissionPolicy.NULL_EMIT_POLICY, 60000L);
        if (this.transform instanceof StreamSourceTransform) {
            ((StreamSourceTransform)this.transform).setWmGenerationParams(wmParams);
            this.fnAdapter = ADAPT_TO_JET_EVENT;
            return (StreamStage)((Object)this);
        }
        TimestampTransform<Object> tsTransform = new TimestampTransform<Object>(this.transform, wmParams);
        this.pipelineImpl.connect(this.transform, tsTransform);
        return new StreamStageImpl(tsTransform, ADAPT_TO_JET_EVENT, this.pipelineImpl);
    }

    @Nonnull
    <R, RET> RET attachMap(@Nonnull DistributedFunction<? super T, ? extends R> mapFn) {
        Util.checkSerializable(mapFn, "mapFn");
        return this.attach(new MapTransform(this.transform, this.fnAdapter.adaptMapFn(mapFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachFilter(@Nonnull DistributedPredicate<T> filterFn) {
        Util.checkSerializable(filterFn, "filterFn");
        return this.attach(new FilterTransform(this.transform, this.fnAdapter.adaptFilterFn(filterFn)), this.fnAdapter);
    }

    @Nonnull
    <R, RET> RET attachFlatMap(@Nonnull DistributedFunction<? super T, ? extends Traverser<? extends R>> flatMapFn) {
        Util.checkSerializable(flatMapFn, "flatMapFn");
        return this.attach(new FlatMapTransform(this.transform, this.fnAdapter.adaptFlatMapFn(flatMapFn)), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends R> mapFn) {
        Util.checkSerializable(mapFn, "mapFn");
        DistributedBiFunction<? super C, ?, ?> adaptedMapFn = this.fnAdapter.adaptMapUsingContextFn(mapFn);
        return this.attach(ProcessorTransform.mapUsingContextTransform(this.transform, contextFactory, adaptedMapFn), this.fnAdapter);
    }

    @Nonnull
    <C, RET> RET attachFilterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiPredicate<? super C, ? super T> filterFn) {
        Util.checkSerializable(filterFn, "filterFn");
        DistributedBiPredicate<? super C, ?> adaptedFilterFn = this.fnAdapter.adaptFilterUsingContextFn(filterFn);
        return this.attach(ProcessorTransform.filterUsingContextTransform(this.transform, contextFactory, adaptedFilterFn), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachFlatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        Util.checkSerializable(flatMapFn, "flatMapFn");
        DistributedBiFunction<? super C, ?, Traverser<?>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingContextFn(flatMapFn);
        return this.attach(ProcessorTransform.flatMapUsingContextTransform(this.transform, contextFactory, adaptedFlatMapFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, R, RET> RET attachMapUsingPartitionedContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedFunction<? super T, ? extends K> partitionKeyFn, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends R> mapFn) {
        Util.checkSerializable(mapFn, "mapFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        DistributedBiFunction<? super C, ?, ?> adaptedMapFn = this.fnAdapter.adaptMapUsingContextFn(mapFn);
        DistributedFunction<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.mapUsingContextPartitionedTransform(this.transform, contextFactory, adaptedMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, RET> RET attachFilterUsingPartitionedContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedFunction<? super T, ? extends K> partitionKeyFn, @Nonnull DistributedBiPredicate<? super C, ? super T> filterFn) {
        Util.checkSerializable(filterFn, "filterFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        DistributedBiPredicate<? super C, ?> adaptedFilterFn = this.fnAdapter.adaptFilterUsingContextFn(filterFn);
        DistributedFunction<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.filterUsingPartitionedContextTransform(this.transform, contextFactory, adaptedFilterFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, R, RET> RET attachFlatMapUsingPartitionedContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedFunction<? super T, ? extends K> partitionKeyFn, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        Util.checkSerializable(flatMapFn, "flatMapFn");
        Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        DistributedBiFunction<? super C, ?, Traverser<?>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingContextFn(flatMapFn);
        DistributedFunction<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.flatMapUsingPartitionedContextTransform(this.transform, contextFactory, adaptedFlatMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <K, R, OUT, RET> RET attachRollingAggregate(DistributedFunction<? super T, ? extends K> keyFn, @Nonnull AggregateOperation1<? super T, ?, ? extends R> aggrOp, @Nonnull DistributedBiFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        Util.checkSerializable(keyFn, "keyFn");
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new RollingAggregateTransform(this.transform, this.fnAdapter.adaptKeyFn(keyFn), this.fnAdapter.adaptAggregateOperation1(aggrOp), this.fnAdapter.adaptRollingAggregateOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachMerge(@Nonnull GeneralStage<? extends T> other) {
        return this.attach(new MergeTransform(this.transform, ((AbstractStage)((Object)other)).transform), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, R, RET> RET attachHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause, @Nonnull DistributedBiFunction<T, T1, R> mapToOutputFn) {
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1)), Collections.singletonList(this.fnAdapter.adaptJoinClause(joinClause)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, K2, T2_IN, T2, R, TA, RET> RET attachHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull DistributedTriFunction<T, T1, T2, R> mapToOutputFn) {
        Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1), ComputeStageImplBase.transformOf(stage2)), Arrays.asList(this.fnAdapter.adaptJoinClause(joinClause1), this.fnAdapter.adaptJoinClause(joinClause2)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachPeek(@Nonnull DistributedPredicate<? super T> shouldLogFn, @Nonnull DistributedFunction<? super T, ? extends CharSequence> toStringFn) {
        Util.checkSerializable(shouldLogFn, "shouldLogFn");
        Util.checkSerializable(toStringFn, "toStringFn");
        return this.attach(new PeekTransform(this.transform, this.fnAdapter.adaptFilterFn(shouldLogFn), this.fnAdapter.adaptToStringFn(toStringFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachCustomTransform(@Nonnull String stageName, @Nonnull DistributedSupplier<Processor> procSupplier) {
        return this.attach(ProcessorTransform.customProcessorTransform(stageName, this.transform, procSupplier), this.fnAdapter);
    }

    @Nonnull
    <K, RET> RET attachPartitionedCustomTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier, @Nonnull DistributedFunction<? super T, ? extends K> partitionKeyFn) {
        return this.attach(PartitionedProcessorTransform.partitionedCustomProcessorTransform(stageName, this.transform, procSupplier, partitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    public SinkStage drainTo(@Nonnull Sink<? super T> sink) {
        SinkImpl sinkImpl = (SinkImpl)sink;
        SinkTransform sinkTransform = new SinkTransform(sinkImpl, this.transform, this.fnAdapter == ADAPT_TO_JET_EVENT);
        SinkStageImpl output = new SinkStageImpl(sinkTransform, this.pipelineImpl);
        sinkImpl.onAssignToStage();
        this.pipelineImpl.connect(this.transform, sinkTransform);
        return output;
    }

    @Nonnull
    abstract <RET> RET attach(@Nonnull AbstractTransform var1, @Nonnull FunctionAdapter var2);

    private boolean hasJetEvents() {
        return this.fnAdapter.equals(ADAPT_TO_JET_EVENT);
    }

    static void ensureJetEvents(@Nonnull ComputeStageImplBase stage, @Nonnull String name) {
        if (stage.fnAdapter != ADAPT_TO_JET_EVENT) {
            throw new IllegalStateException(name + " is missing a timestamp definition. Call one of the .addTimestamps() methods on it before performing the aggregation.");
        }
    }
}

