/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.ListSourceContext;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.types.Value;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class FromElementsFunctionTest {
    private static final String[] STRING_ARRAY_DATA = new String[]{"Oh", "boy", "what", "a", "show", "!"};
    private static final List<String> STRING_LIST_DATA = Arrays.asList(STRING_ARRAY_DATA);
    @Rule
    public final ExpectedException thrown = ExpectedException.none();

    private static <T> List<T> runSource(FromElementsFunction<T> source) throws Exception {
        ArrayList result = new ArrayList();
        FromElementsFunction clonedSource = (FromElementsFunction)InstantiationUtil.clone(source);
        clonedSource.run(new ListSourceContext(result));
        return result;
    }

    @Test
    public void testStrings() {
        try {
            Object[] data = new String[]{"Oh", "boy", "what", "a", "show", "!"};
            FromElementsFunction source = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), data);
            ArrayList result = new ArrayList();
            source.run(new ListSourceContext(result));
            Assert.assertEquals(Arrays.asList(data), result);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNullElement() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("contains a null element");
        new FromElementsFunction((Object[])new String[]{"a", null, "b"});
    }

    @Test
    public void testSetOutputTypeWithNoSerializer() throws Exception {
        FromElementsFunction source = new FromElementsFunction((Object[])STRING_ARRAY_DATA);
        Assert.assertNull((Object)source.getSerializer());
        source.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        Assert.assertNotNull((Object)source.getSerializer());
        Assert.assertEquals((Object)BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (Object)source.getSerializer());
        List result = FromElementsFunctionTest.runSource(source);
        Assert.assertEquals(STRING_LIST_DATA, result);
    }

    @Test
    public void testSetOutputTypeWithSameSerializer() throws Exception {
        FromElementsFunction source = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), STRING_LIST_DATA);
        TypeSerializer existingSerializer = source.getSerializer();
        source.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        TypeSerializer newSerializer = source.getSerializer();
        Assert.assertEquals((Object)existingSerializer, (Object)newSerializer);
        List result = FromElementsFunctionTest.runSource(source);
        Assert.assertEquals(STRING_LIST_DATA, result);
    }

    @Test
    public void testSetOutputTypeWithIncompatibleType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("not all subclasses of java.lang.Integer");
        FromElementsFunction source = new FromElementsFunction(STRING_LIST_DATA);
        source.setOutputType((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    }

    @Test
    public void testSetOutputTypeWithExistingBrokenSerializer() throws Exception {
        ValueTypeInfo info = new ValueTypeInfo(DeserializeTooMuchType.class);
        FromElementsFunction source = new FromElementsFunction(info.createSerializer(new ExecutionConfig()), (Object[])new DeserializeTooMuchType[]{new DeserializeTooMuchType()});
        TypeSerializer existingSerializer = source.getSerializer();
        source.setOutputType((TypeInformation)new GenericTypeInfo(DeserializeTooMuchType.class), new ExecutionConfig());
        TypeSerializer newSerializer = source.getSerializer();
        Assert.assertNotEquals((Object)existingSerializer, (Object)newSerializer);
        List result = FromElementsFunctionTest.runSource(source);
        Assert.assertThat(result, (Matcher)Matchers.hasSize((int)1));
        Assert.assertThat(result.get(0), (Matcher)Matchers.instanceOf(DeserializeTooMuchType.class));
    }

    @Test
    public void testSetOutputTypeAfterTransferred() throws Exception {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("The output type should've been specified before shipping the graph to the cluster");
        FromElementsFunction source = (FromElementsFunction)InstantiationUtil.clone((Serializable)new FromElementsFunction(STRING_LIST_DATA));
        source.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
    }

    @Test
    public void testNoSerializer() throws Exception {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("serializer not configured");
        FromElementsFunction source = new FromElementsFunction(STRING_LIST_DATA);
        FromElementsFunctionTest.runSource(source);
    }

    @Test
    public void testNonJavaSerializableType() {
        try {
            Object[] data = new MyPojo[]{new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
            FromElementsFunction source = new FromElementsFunction(TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), data);
            List result = FromElementsFunctionTest.runSource(source);
            Assert.assertEquals(Arrays.asList(data), result);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNonJavaSerializableTypeWithSetOutputType() throws Exception {
        Object[] data = new MyPojo[]{new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
        FromElementsFunction source = new FromElementsFunction(data);
        source.setOutputType(TypeExtractor.getForClass(MyPojo.class), new ExecutionConfig());
        List result = FromElementsFunctionTest.runSource(source);
        Assert.assertEquals(Arrays.asList(data), result);
    }

    @Test
    public void testSerializationError() {
        try {
            ValueTypeInfo info = new ValueTypeInfo(SerializationErrorType.class);
            try {
                new FromElementsFunction(info.createSerializer(new ExecutionConfig()), (Object[])new SerializationErrorType[]{new SerializationErrorType()});
                Assert.fail((String)"should fail with an exception");
            }
            catch (IOException e) {
                Assert.assertTrue((boolean)ExceptionUtils.stringifyException((Throwable)e).contains("test exception"));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeSerializationError() {
        try {
            ValueTypeInfo info = new ValueTypeInfo(DeserializeTooMuchType.class);
            FromElementsFunction source = new FromElementsFunction(info.createSerializer(new ExecutionConfig()), (Object[])new DeserializeTooMuchType[]{new DeserializeTooMuchType()});
            try {
                source.run(new ListSourceContext(new ArrayList()));
                Assert.fail((String)"should fail with an exception");
            }
            catch (IOException e) {
                Assert.assertTrue((boolean)ExceptionUtils.stringifyException((Throwable)e).contains("user-defined serialization"));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointAndRestore() {
        try {
            int numElements = 10000;
            ArrayList<Integer> data = new ArrayList<Integer>(10000);
            ArrayList result = new ArrayList(10000);
            for (int i = 0; i < 10000; ++i) {
                data.add(i);
            }
            final FromElementsFunction source = new FromElementsFunction((TypeSerializer)IntSerializer.INSTANCE, data);
            StreamSource src = new StreamSource((SourceFunction)source);
            AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness(src, 1, 1, 0);
            testHarness.open();
            final ListSourceContext ctx = new ListSourceContext(result, 2L);
            final Throwable[] error = new Throwable[1];
            Thread runner = new Thread(){

                @Override
                public void run() {
                    try {
                        source.run(ctx);
                    }
                    catch (Throwable t) {
                        error[0] = t;
                    }
                }
            };
            runner.start();
            Thread.sleep(1000L);
            ArrayList checkpointData = new ArrayList(10000);
            OperatorSubtaskState handles = null;
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                handles = testHarness.snapshot(566L, System.currentTimeMillis());
                checkpointData.addAll(result);
            }
            source.cancel();
            runner.join();
            if (error[0] != null) {
                System.err.println("Error in asynchronous source runner");
                error[0].printStackTrace();
                Assert.fail((String)"Error in asynchronous source runner");
            }
            FromElementsFunction sourceCopy = new FromElementsFunction((TypeSerializer)IntSerializer.INSTANCE, data);
            StreamSource srcCopy = new StreamSource((SourceFunction)sourceCopy);
            AbstractStreamOperatorTestHarness testHarnessCopy = new AbstractStreamOperatorTestHarness(srcCopy, 1, 1, 0);
            testHarnessCopy.setup();
            testHarnessCopy.initializeState(handles);
            testHarnessCopy.open();
            ListSourceContext newCtx = new ListSourceContext(checkpointData);
            sourceCopy.run(newCtx);
            Assert.assertEquals(data, checkpointData);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class DeserializeTooMuchType
    implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        private DeserializeTooMuchType() {
        }

        public void write(DataOutputView out) throws IOException {
            out.writeInt(42);
        }

        public void read(DataInputView in) throws IOException {
            in.readLong();
        }
    }

    private static class SerializationErrorType
    implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        private SerializationErrorType() {
        }

        public void write(DataOutputView out) throws IOException {
            throw new IOException("test exception");
        }

        public void read(DataInputView in) throws IOException {
            throw new IOException("test exception");
        }
    }

    private static class MyPojo {
        public long val1;
        public int val2;

        public MyPojo() {
        }

        public MyPojo(long val1, int val2) {
            this.val1 = val1;
            this.val2 = val2;
        }

        public int hashCode() {
            return this.val2;
        }

        public boolean equals(Object obj) {
            if (obj instanceof MyPojo) {
                MyPojo that = (MyPojo)obj;
                return this.val1 == that.val1 && this.val2 == that.val2;
            }
            return false;
        }
    }
}

