/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.base.Supplier;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestClientProtocolForPipelineRecovery {
    private static final Logger LOG = LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetNewStamp() throws IOException {
        int numDataNodes = 1;
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
        try {
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            NamenodeProtocols namenode = cluster.getNameNodeRpc();
            Path file = new Path("dataprotocol.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 1L, (short)numDataNodes, 0L);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock((FileSystem)fileSys, file);
            try {
                namenode.updateBlockForPipeline(firstBlock, "");
                Assert.fail((String)"Can not get a new GS from a finalized block");
            }
            catch (IOException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("not " + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION));
            }
            try {
                long newBlockId = firstBlock.getBlockId() + 1L;
                ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(), newBlockId, 0L, firstBlock.getGenerationStamp());
                namenode.updateBlockForPipeline(newBlock, "");
                Assert.fail((String)"Cannot get a new GS from a non-existent block");
            }
            catch (IOException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("does not exist"));
            }
            DFSOutputStream out = null;
            try {
                out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
                out.write(1);
                out.hflush();
                FSDataInputStream in = null;
                try {
                    in = fileSys.open(file);
                    firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
                }
                finally {
                    IOUtils.closeStream((Closeable)in);
                }
                DFSClient dfs = fileSys.dfs;
                try {
                    namenode.updateBlockForPipeline(firstBlock, "test" + dfs.clientName);
                    Assert.fail((String)"Cannot get a new GS for a non lease holder");
                }
                catch (LeaseExpiredException e) {
                    Assert.assertTrue((boolean)e.getMessage().startsWith("Lease mismatch"));
                }
                try {
                    namenode.updateBlockForPipeline(firstBlock, null);
                    Assert.fail((String)"Cannot get a new GS for a null lease holder");
                }
                catch (LeaseExpiredException e) {
                    Assert.assertTrue((boolean)e.getMessage().startsWith("Lease mismatch"));
                }
                namenode.updateBlockForPipeline(firstBlock, dfs.clientName);
            }
            catch (Throwable throwable) {
                IOUtils.closeStream(out);
                throw throwable;
            }
            IOUtils.closeStream((Closeable)out);
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPipelineRecoveryForLastBlock() throws IOException {
        DFSClientFaultInjector faultInjector = (DFSClientFaultInjector)Mockito.mock(DFSClientFaultInjector.class);
        DFSClientFaultInjector oldInjector = DFSClientFaultInjector.get();
        DFSClientFaultInjector.set((DFSClientFaultInjector)faultInjector);
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 3);
        MiniDFSCluster cluster = null;
        try {
            int numDataNodes = 3;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("dataprotocol1.dat");
            Mockito.when((Object)faultInjector.failPacket()).thenReturn((Object)true);
            DFSTestUtil.createFile((FileSystem)fileSys, file, 68000000L, (short)numDataNodes, 0L);
            FSDataInputStream in = fileSys.open(file);
            try {
                in.read();
            }
            catch (BlockMissingException bme) {
                Assert.fail((String)"Block is missing because the file was closed with corrupt replicas.");
            }
        }
        finally {
            DFSClientFaultInjector.set((DFSClientFaultInjector)oldInjector);
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPacketTransmissionDelay() throws Exception {
        DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector(){

            public boolean dropHeartbeatPacket() {
                return true;
            }
        };
        DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set((DataNodeFaultInjector)dnFaultInjector);
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.set("dfs.client.socket-timeout", "3000");
        MiniDFSCluster cluster = null;
        try {
            int numDataNodes = 2;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem();
            FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
            out.write(49);
            out.hflush();
            DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
            DatanodeInfo[] orgNodes = dfsOut.getPipeline();
            Thread.sleep(3500L);
            out.write(50);
            out.hflush();
            DatanodeInfo[] newNodes = dfsOut.getPipeline();
            out.close();
            boolean contains = false;
            for (int i = 0; i < newNodes.length; ++i) {
                if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) {
                    throw new IOException("The first datanode should have been replaced.");
                }
                if (!orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) continue;
                contains = true;
            }
            Assert.assertTrue((boolean)contains);
        }
        finally {
            DataNodeFaultInjector.set((DataNodeFaultInjector)oldDnInjector);
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPipelineRecoveryOnOOB() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.set("dfs.client.datanode-restart.timeout", "15");
        MiniDFSCluster cluster = null;
        try {
            int numDataNodes = 1;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("dataprotocol2.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 10240L, (short)1, 0L);
            DFSOutputStream out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
            out.write(1);
            out.hflush();
            DFSAdmin dfsadmin = new DFSAdmin((Configuration)conf);
            DataNode dn = cluster.getDataNodes().get(0);
            String dnAddr = dn.getDatanodeId().getIpcAddr(false);
            String[] args1 = new String[]{"-shutdownDatanode", dnAddr, "upgrade"};
            Assert.assertEquals((long)0L, (long)dfsadmin.run(args1));
            GenericTestUtils.waitForThreadTermination((String)"Async datanode shutdown thread", (int)100, (int)10000);
            cluster.restartDataNode(0, true);
            out.close();
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEvictWriter() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem();
            Path file = new Path("testEvictWriter.dat");
            FSDataOutputStream out = fs.create(file, (short)2);
            out.write(49);
            out.hflush();
            DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
            DatanodeInfo[] nodes = dfsOut.getPipeline();
            Assert.assertEquals((long)2L, (long)nodes.length);
            String dnAddr = nodes[1].getIpcAddr(false);
            DFSAdmin dfsadmin = new DFSAdmin((Configuration)conf);
            String[] args1 = new String[]{"-evictWriters", dnAddr};
            Assert.assertEquals((long)0L, (long)dfsadmin.run(args1));
            out.write(49);
            out.hflush();
            nodes = dfsOut.getPipeline();
            try {
                Assert.assertTrue((nodes.length > 0 ? 1 : 0) != 0);
                for (int i = 0; i < nodes.length; ++i) {
                    Assert.assertFalse((boolean)dnAddr.equals(nodes[i].getIpcAddr(false)));
                }
            }
            finally {
                out.close();
            }
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPipelineRecoveryOnRestartFailure() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.set("dfs.client.datanode-restart.timeout", "5");
        MiniDFSCluster cluster = null;
        try {
            int numDataNodes = 2;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("dataprotocol3.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 10240L, (short)2, 0L);
            DFSOutputStream out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
            out.write(1);
            out.hflush();
            DFSAdmin dfsadmin = new DFSAdmin((Configuration)conf);
            DataNode dn = cluster.getDataNodes().get(0);
            String dnAddr1 = dn.getDatanodeId().getIpcAddr(false);
            String[] args1 = new String[]{"-shutdownDatanode", dnAddr1, "upgrade"};
            Assert.assertEquals((long)0L, (long)dfsadmin.run(args1));
            GenericTestUtils.waitForThreadTermination((String)"Async datanode shutdown thread", (int)100, (int)10000);
            out.close();
            out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
            out.write(1);
            out.hflush();
            dn = cluster.getDataNodes().get(1);
            String dnAddr2 = dn.getDatanodeId().getIpcAddr(false);
            String[] args2 = new String[]{"-shutdownDatanode", dnAddr2, "upgrade"};
            Assert.assertEquals((long)0L, (long)dfsadmin.run(args2));
            GenericTestUtils.waitForThreadTermination((String)"Async datanode shutdown thread", (int)100, (int)10000);
            try {
                out.close();
                assert (false);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).build();
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 10240L, (short)2, 0L);
            final DFSOutputStream out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
            out.write(1);
            out.hflush();
            final long oldGs = out.getBlock().getGenerationStamp();
            MiniDFSCluster.DataNodeProperties dnProps = cluster.stopDataNodeForUpgrade(0);
            GenericTestUtils.waitForThreadTermination((String)"Async datanode shutdown thread", (int)100, (int)10000);
            cluster.restartDataNode(dnProps, true);
            cluster.waitActive();
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    return out.getBlock().getGenerationStamp() > oldGs;
                }
            }, (int)100, (int)10000);
            Assert.assertEquals((String)"The pipeline recovery count shouldn't increase", (long)0L, (long)out.getStreamer().getPipelineRecoveryCount());
            out.write(1);
            out.close();
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.best-effort", true);
        MiniDFSCluster cluster = null;
        DFSClientFaultInjector old = DFSClientFaultInjector.get();
        try {
            DatanodeInfo[] pipeline;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 10240L, (short)3, 0L);
            DFSClientFaultInjector.set((DFSClientFaultInjector)new DFSClientFaultInjector(){

                public boolean skipRollingRestartWait() {
                    return true;
                }
            });
            final DFSOutputStream out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
            final AtomicBoolean running = new AtomicBoolean(true);
            final AtomicBoolean failed = new AtomicBoolean(false);
            Thread t = new Thread(){

                @Override
                public void run() {
                    while (running.get()) {
                        try {
                            out.write("test".getBytes());
                            out.hflush();
                            Thread.sleep(1000L);
                        }
                        catch (IOException | InterruptedException e) {
                            LOG.error("Exception during write", (Throwable)e);
                            failed.set(true);
                            break;
                        }
                    }
                    running.set(false);
                }
            };
            t.start();
            Thread.sleep(1000L);
            for (DatanodeInfo node : pipeline = out.getPipeline()) {
                Assert.assertFalse((String)"Write should be going on", (boolean)failed.get());
                ArrayList<DataNode> dataNodes = cluster.getDataNodes();
                int indexToShutdown = 0;
                for (int i = 0; i < dataNodes.size(); ++i) {
                    if (dataNodes.get(i).getIpcPort() != node.getIpcPort()) continue;
                    indexToShutdown = i;
                    break;
                }
                final long oldGs = out.getBlock().getGenerationStamp();
                MiniDFSCluster.DataNodeProperties dnProps = cluster.stopDataNodeForUpgrade(indexToShutdown);
                GenericTestUtils.waitForThreadTermination((String)"Async datanode shutdown thread", (int)100, (int)10000);
                cluster.restartDataNode(dnProps, true);
                cluster.waitActive();
                GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                    public Boolean get() {
                        return out.getBlock().getGenerationStamp() > oldGs;
                    }
                }, (int)100, (int)10000);
                Assert.assertEquals((String)"The pipeline recovery count shouldn't increase", (long)0L, (long)out.getStreamer().getPipelineRecoveryCount());
            }
            Assert.assertFalse((String)"Write should be going on", (boolean)failed.get());
            running.set(false);
            t.join();
            out.write("testagain".getBytes());
            Assert.assertTrue((String)"There should be atleast 2 nodes in pipeline still", (out.getPipeline().length >= 2 ? 1 : 0) != 0);
            out.close();
        }
        finally {
            DFSClientFaultInjector.set((DFSClientFaultInjector)old);
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZeroByteBlockRecovery() throws Exception {
        DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector(){
            int tries = 1;

            public void stopSendingPacketDownstream() throws IOException {
                if (this.tries > 0) {
                    --this.tries;
                    try {
                        Thread.sleep(60000L);
                    }
                    catch (InterruptedException ie) {
                        throw new IOException("Interrupted while sleeping. Bailing out.");
                    }
                }
            }
        };
        DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set((DataNodeFaultInjector)dnFaultInjector);
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.set("dfs.client.socket-timeout", "1000");
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "ALWAYS");
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem();
            FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
            out.write(49);
            out.hflush();
            out.close();
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
            DataNodeFaultInjector.set((DataNodeFaultInjector)oldDnInjector);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPipelineRecoveryWithTransferBlock() throws Exception {
        int chunkSize = 512;
        int oneWriteSize = 5000;
        int totalSize = 0x100000;
        int errorInjectionPos = 512;
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(4).build();
        DataNodeFaultInjector old = DataNodeFaultInjector.get();
        try {
            DistributedFileSystem fs = cluster.getFileSystem();
            Path fileName = new Path("/f");
            FSDataOutputStream o = fs.create(fileName);
            o.writeBytes("hello");
            o.hflush();
            DFSOutputStream dfsO = (DFSOutputStream)o.getWrappedStream();
            DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
            final String lastDn = pipeline[2].getXferAddr(false);
            final AtomicBoolean failed = new AtomicBoolean(false);
            DataNodeFaultInjector.set((DataNodeFaultInjector)new DataNodeFaultInjector(){

                public void failPipeline(ReplicaInPipelineInterface replicaInfo, String mirror) throws IOException {
                    if (!lastDn.equals(mirror)) {
                        return;
                    }
                    if (!failed.get() && replicaInfo.getBytesAcked() > 512L && replicaInfo.getBytesAcked() % 512L != 0L) {
                        for (int count = 0; count < 10; ++count) {
                            if (replicaInfo.getBytesOnDisk() / 512L - replicaInfo.getBytesAcked() / 512L >= 1L) {
                                failed.set(true);
                                throw new IOException("Failing Pipeline " + replicaInfo.getBytesAcked() + " : " + replicaInfo.getBytesOnDisk());
                            }
                            try {
                                Thread.sleep(200L);
                                continue;
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                        }
                    }
                }
            });
            Random r = new Random();
            byte[] b = new byte[5000];
            for (int count = 0; count < 0x100000; count += 5000) {
                r.nextBytes(b);
                o.write(b);
                o.hflush();
            }
            Assert.assertTrue((String)"Expected a failure in the pipeline", (boolean)failed.get());
            DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes();
            o.close();
            for (DataNode d : cluster.getDataNodes()) {
                DataNodeTestUtils.triggerBlockReport(d);
            }
            List<DatanodeInfo> pipelineList = Arrays.asList(pipeline);
            DatanodeInfo newNode = null;
            for (DatanodeInfo node : newNodes) {
                if (pipelineList.contains(node)) continue;
                newNode = node;
                break;
            }
            LOG.info("Number of nodes in pipeline: {} newNode {}", (Object)newNodes.length, (Object)newNode.getName());
            for (int i = 0; i < newNodes.length; ++i) {
                if (newNodes[i].getName().equals(newNode.getName())) continue;
                LOG.info("shutdown {}", (Object)newNodes[i].getName());
                cluster.stopDataNode(newNodes[i].getName());
            }
            DFSTestUtil.readFile((FileSystem)fs, fileName);
        }
        finally {
            DataNodeFaultInjector.set((DataNodeFaultInjector)old);
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpdatePipeLineAfterDNReg() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).build();
            cluster.waitActive();
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("/testUpdatePipeLineAfterDNReg");
            FSDataOutputStream out = fileSys.create(file);
            out.write(1);
            out.hflush();
            DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
            DatanodeInfo[] pipeline = dfsOut.getPipeline();
            DataNode dn1 = cluster.getDataNode(pipeline[0].getIpcPort());
            DataNodeTestUtils.setHeartbeatsDisabledForTests(dn1, true);
            DatanodeDescriptor dn1Desc = cluster.getNamesystem(0).getBlockManager().getDatanodeManager().getDatanode(dn1.getDatanodeId());
            cluster.setDataNodeDead((DatanodeID)dn1Desc);
            DatanodeProtocolClientSideTranslatorPB dnp = new DatanodeProtocolClientSideTranslatorPB(cluster.getNameNode().getNameNodeAddress(), (Configuration)conf);
            dnp.registerDatanode(dn1.getDNRegistrationForBP(cluster.getNamesystem().getBlockPoolId()));
            DFSOutputStream dfsO = (DFSOutputStream)out.getWrappedStream();
            String clientName = fileSys.getClient().getClientName();
            NamenodeProtocols namenode = cluster.getNameNodeRpc();
            LocatedBlock newBlock = namenode.updateBlockForPipeline(dfsO.getBlock(), clientName);
            dfsO.getStreamer().updatePipeline(newBlock.getBlock().getGenerationStamp());
            newBlock = namenode.updateBlockForPipeline(dfsO.getBlock(), clientName);
            dfsO.getStreamer().updatePipeline(newBlock.getBlock().getGenerationStamp());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }
}

