/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.UncheckpointedCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.utils.AbstractTestCoordinationRequestHandler;
import org.apache.flink.streaming.api.operators.collect.utils.TestCheckpointedCoordinationRequestHandler;
import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
import org.apache.flink.streaming.api.operators.collect.utils.TestUncheckpointedCoordinationRequestHandler;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class CollectResultIteratorTest
extends TestLogger {
    private final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
    private static final JobID TEST_JOB_ID = new JobID();
    private static final String ACCUMULATOR_NAME = "accumulatorName";

    @Test
    public void testUncheckpointedIterator() throws Exception {
        Random random = new Random();
        for (int testCount = 200; testCount > 0; --testCount) {
            ArrayList<Integer> expected = new ArrayList<Integer>();
            for (int i = 0; i < 200; ++i) {
                expected.add(i);
            }
            CollectResultIterator iterator = (CollectResultIterator)this.createIteratorAndJobClient((AbstractCollectResultBuffer<Integer>)new UncheckpointedCollectResultBuffer(this.serializer, (boolean)true), new TestUncheckpointedCoordinationRequestHandler<Integer>((int)random.nextInt((int)3), expected, this.serializer, (String)ACCUMULATOR_NAME)).f0;
            ArrayList<Object> actual = new ArrayList<Object>();
            while (iterator.hasNext()) {
                actual.add(iterator.next());
            }
            HashSet actualSet = new HashSet(actual);
            Iterator iterator2 = expected.iterator();
            while (iterator2.hasNext()) {
                int expectedValue = (Integer)iterator2.next();
                Assert.assertTrue((boolean)actualSet.contains(expectedValue));
            }
            iterator.close();
        }
    }

    @Test
    public void testCheckpointedIterator() throws Exception {
        for (int testCount = 200; testCount > 0; --testCount) {
            ArrayList<Integer> expected = new ArrayList<Integer>();
            for (int i = 0; i < 200; ++i) {
                expected.add(i);
            }
            CollectResultIterator iterator = (CollectResultIterator)this.createIteratorAndJobClient((AbstractCollectResultBuffer<Integer>)new CheckpointedCollectResultBuffer(this.serializer), new TestCheckpointedCoordinationRequestHandler<Integer>(expected, this.serializer, (String)ACCUMULATOR_NAME)).f0;
            ArrayList<Object> actual = new ArrayList<Object>();
            while (iterator.hasNext()) {
                actual.add(iterator.next());
            }
            Assert.assertEquals((long)expected.size(), (long)actual.size());
            Collections.sort(expected);
            Collections.sort(actual);
            Assert.assertArrayEquals((Object[])expected.toArray(new Integer[0]), (Object[])actual.toArray(new Integer[0]));
            iterator.close();
        }
    }

    @Test
    public void testEarlyClose() throws Exception {
        ArrayList<Integer> expected = new ArrayList<Integer>();
        for (int i = 0; i < 200; ++i) {
            expected.add(i);
        }
        Tuple2<CollectResultIterator<Integer>, JobClient> tuple2 = this.createIteratorAndJobClient((AbstractCollectResultBuffer<Integer>)new CheckpointedCollectResultBuffer(this.serializer), new TestCheckpointedCoordinationRequestHandler<Integer>(expected, this.serializer, ACCUMULATOR_NAME));
        CollectResultIterator iterator = (CollectResultIterator)tuple2.f0;
        JobClient jobClient = (JobClient)tuple2.f1;
        for (int i = 0; i < 100; ++i) {
            Assert.assertTrue((boolean)iterator.hasNext());
            Assert.assertNotNull((Object)iterator.next());
        }
        Assert.assertTrue((boolean)iterator.hasNext());
        iterator.close();
        Assert.assertEquals((Object)JobStatus.CANCELED, jobClient.getJobStatus().get());
    }

    private Tuple2<CollectResultIterator<Integer>, JobClient> createIteratorAndJobClient(AbstractCollectResultBuffer<Integer> buffer, final AbstractTestCoordinationRequestHandler<Integer> handler) {
        CollectResultIterator iterator = new CollectResultIterator(buffer, CompletableFuture.completedFuture(TEST_OPERATOR_ID), ACCUMULATOR_NAME, 0);
        TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider(){

            @Override
            public boolean isJobFinished() {
                return handler.isClosed();
            }

            @Override
            public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
                return handler.getAccumulatorResults();
            }
        };
        TestJobClient jobClient = new TestJobClient(TEST_JOB_ID, TEST_OPERATOR_ID, handler, infoProvider);
        iterator.setJobClient((JobClient)jobClient);
        return Tuple2.of((Object)iterator, (Object)jobClient);
    }
}

