/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.filesystem.FileBasedStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(value=Parameterized.class)
public class CheckpointStateOutputStreamTest
extends TestLogger {
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    @Parameterized.Parameter
    public CheckpointStateOutputStreamType stateOutputStreamType;

    @Parameterized.Parameters
    public static Collection<CheckpointStateOutputStreamType> getCheckpointStateOutputStreamType() {
        return Arrays.asList(CheckpointStateOutputStreamType.values());
    }

    @Test
    public void testEmptyState() throws Exception {
        FileStateHandle handle;
        FileSystem fs = FileSystem.getLocalFileSystem();
        Path folder = this.baseFolder();
        String fileName = "myFileName";
        Path filePath = new Path(folder, "myFileName");
        try (FSDataOutputStream stream = this.createTestStream(fs, folder, "myFileName");){
            handle = this.closeAndGetResult(stream);
        }
        Assert.assertNotNull((Object)handle);
        Assert.assertEquals((Object)filePath, (Object)handle.getFilePath());
        Assert.assertTrue((boolean)fs.exists(handle.getFilePath()));
        Assert.assertFalse((boolean)fs.getFileStatus(filePath).isDir());
        var7_6 = null;
        try (FSDataInputStream in = handle.openInputStream();){
            Assert.assertEquals((long)-1L, (long)in.read());
        }
        catch (Throwable throwable) {
            var7_6 = throwable;
            throw throwable;
        }
    }

    @Test
    public void testWriteAndRead() throws Exception {
        byte[] buffer2;
        FileStateHandle handle;
        FileSystem fs = FileSystem.getLocalFileSystem();
        Path folder = this.baseFolder();
        String fileName = "fooBarName";
        Random rnd = new Random();
        byte[] data = new byte[1694523];
        try (FSDataOutputStream stream = this.createTestStream(fs, folder, "fooBarName");){
            int i = 0;
            while (i < data.length) {
                if (rnd.nextBoolean()) {
                    stream.write((int)data[i++]);
                    continue;
                }
                int len = rnd.nextInt(Math.min(data.length - i, 32));
                stream.write(data, i, len);
                i += len;
            }
            handle = this.closeAndGetResult(stream);
        }
        var8_7 = null;
        try (FSDataInputStream in = handle.openInputStream();){
            buffer2 = new byte[data.length];
            CheckpointStateOutputStreamTest.readFully((InputStream)in, buffer2);
            Assert.assertArrayEquals((byte[])data, (byte[])buffer2);
        }
        catch (Throwable buffer2) {
            var8_7 = buffer2;
            throw buffer2;
        }
        in = fs.open(handle.getFilePath());
        var8_7 = null;
        try {
            buffer2 = new byte[data.length];
            CheckpointStateOutputStreamTest.readFully((InputStream)in, buffer2);
            Assert.assertArrayEquals((byte[])data, (byte[])buffer2);
        }
        catch (Throwable throwable) {
            var8_7 = throwable;
            throw throwable;
        }
        finally {
            if (in != null) {
                if (var8_7 != null) {
                    try {
                        in.close();
                    }
                    catch (Throwable throwable) {
                        var8_7.addSuppressed(throwable);
                    }
                } else {
                    in.close();
                }
            }
        }
    }

    @Test
    public void testCleanupWhenClosingStream() throws IOException {
        FileSystem fs = FileSystem.getLocalFileSystem();
        Path folder = new Path(this.tmp.newFolder().toURI());
        String fileName = "nonCreativeTestFileName";
        Path path = new Path(folder, "nonCreativeTestFileName");
        try (FSDataOutputStream stream = this.createTestStream(fs, folder, "nonCreativeTestFileName");){
            Random rnd = new Random();
            for (int i = 0; i < rnd.nextInt(1000); ++i) {
                stream.write(rnd.nextInt(100));
            }
        }
        Assert.assertFalse((boolean)fs.exists(path));
    }

    @Test
    public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
        Path folder = new Path(this.tmp.newFolder().toURI());
        String fileName = "test_name";
        Path filePath = new Path(folder, "test_name");
        FileSystem fs = (FileSystem)Mockito.spy((Object)((Object)new FsWithoutRecoverableWriter((FunctionWithException<Path, FSDataOutputStream, IOException>)((FunctionWithException)path -> new FailingCloseStream(new File(path.getPath()))))));
        FSDataOutputStream stream = this.createTestStream(fs, folder, "test_name");
        stream.write(new byte[]{1, 2, 3, 4, 5});
        try {
            this.closeAndGetResult(stream);
            Assert.fail((String)"Expected IOException");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        ((FileSystem)Mockito.verify((Object)fs)).delete(filePath, false);
    }

    @Test
    public void testCloseDoesNotLock() throws Exception {
        Path folder = new Path(this.tmp.newFolder().toURI());
        String fileName = "this-is-ignored-anyways.file";
        FileSystem fileSystem = (FileSystem)Mockito.spy((Object)((Object)new FsWithoutRecoverableWriter((FunctionWithException<Path, FSDataOutputStream, IOException>)((FunctionWithException)path -> new BlockerStream()))));
        final FSDataOutputStream checkpointStream = this.createTestStream(fileSystem, folder, "this-is-ignored-anyways.file");
        final OneShotLatch sync = new OneShotLatch();
        CheckedThread thread = new CheckedThread(){

            public void go() throws Exception {
                sync.trigger();
                CheckpointStateOutputStreamTest.this.closeAndGetResult(checkpointStream);
            }
        };
        thread.start();
        sync.await();
        checkpointStream.close();
        try {
            thread.sync();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private FSDataOutputStream createTestStream(FileSystem fs, Path dir, String fileName) throws IOException {
        switch (this.stateOutputStreamType) {
            case FileBasedState: {
                return new FileBasedStateOutputStream(fs, new Path(dir, fileName));
            }
            case FsCheckpointMetaData: {
                Path fullPath = new Path(dir, fileName);
                return new FsCheckpointMetadataOutputStream(fs, fullPath, dir);
            }
        }
        throw new IllegalStateException("Unsupported checkpoint stream output type.");
    }

    private FileStateHandle closeAndGetResult(FSDataOutputStream stream) throws IOException {
        switch (this.stateOutputStreamType) {
            case FileBasedState: {
                return ((FileBasedStateOutputStream)stream).closeAndGetHandle();
            }
            case FsCheckpointMetaData: {
                return ((FsCheckpointMetadataOutputStream)stream).closeAndFinalizeCheckpoint().getMetadataHandle();
            }
        }
        throw new IllegalStateException("Unsupported checkpoint stream output type.");
    }

    private Path baseFolder() throws Exception {
        return new Path(new File(this.tmp.newFolder(), UUID.randomUUID().toString()).toURI());
    }

    private static void readFully(InputStream in, byte[] buffer) throws IOException {
        int read;
        int pos = 0;
        for (int remaining = buffer.length; remaining > 0; remaining -= read) {
            read = in.read(buffer, pos, remaining);
            if (read == -1) {
                throw new EOFException();
            }
            pos += read;
        }
    }

    private static class FsWithoutRecoverableWriter
    extends LocalFileSystem {
        private final FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory;

        FsWithoutRecoverableWriter(FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory) {
            this.streamFactory = streamFactory;
        }

        public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite) throws IOException {
            return (FSDataOutputStream)this.streamFactory.apply((Object)filePath);
        }

        public LocalRecoverableWriter createRecoverableWriter() throws IOException {
            throw new UnsupportedOperationException("This file system does not support recoverable writers.");
        }
    }

    private static class FailingCloseStream
    extends LocalDataOutputStream {
        FailingCloseStream(File file) throws IOException {
            super(file);
        }

        public void close() throws IOException {
            throw new IOException();
        }
    }

    private static class BlockerStream
    extends FSDataOutputStream {
        private final OneShotLatch blocker = new OneShotLatch();

        private BlockerStream() {
        }

        public long getPos() throws IOException {
            this.block();
            return 0L;
        }

        public void write(int b) throws IOException {
            this.block();
        }

        public void flush() throws IOException {
            this.block();
        }

        public void sync() throws IOException {
            this.block();
        }

        public void close() throws IOException {
            this.blocker.trigger();
        }

        private void block() throws IOException {
            try {
                this.blocker.await();
            }
            catch (InterruptedException e) {
                throw new IOException("interrupted");
            }
            throw new IOException("closed");
        }
    }

    public static enum CheckpointStateOutputStreamType {
        FileBasedState,
        FsCheckpointMetaData;

    }
}

