/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.class)
@Category(value={ReplicationTests.class, LargeTests.class})
public class TestWALEntryStream {
    private static HBaseTestingUtility TEST_UTIL;
    private static Configuration conf;
    private static FileSystem fs;
    private static MiniDFSCluster cluster;
    private static final TableName tableName;
    private static final byte[] family;
    private static final byte[] qualifier;
    private static final HRegionInfo info;
    private static final HTableDescriptor htd;
    private static NavigableMap<byte[], Integer> scopes;
    private WAL log;
    PriorityBlockingQueue<Path> walQueue;
    private PathWatcher pathWatcher;
    @Rule
    public TestName tn = new TestName();
    private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL = new HBaseTestingUtility();
        conf = TEST_UTIL.getConfiguration();
        TEST_UTIL.startMiniDFSCluster(3);
        cluster = TEST_UTIL.getDFSCluster();
        fs = cluster.getFileSystem();
        scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
        for (byte[] fam : htd.getFamiliesKeys()) {
            scopes.put(fam, 0);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL.shutdownMiniCluster();
    }

    @Before
    public void setUp() throws Exception {
        this.walQueue = new PriorityBlockingQueue();
        ArrayList<PathWatcher> listeners = new ArrayList<PathWatcher>();
        this.pathWatcher = new PathWatcher();
        listeners.add(this.pathWatcher);
        WALFactory wals = new WALFactory(conf, listeners, this.tn.getMethodName());
        this.log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
    }

    @After
    public void tearDown() throws Exception {
        this.log.close();
    }

    @Test
    public void testDifferentCounts() throws Exception {
        int[] NB_ROWS = new int[]{1500, 60000};
        int[] NB_KVS = new int[]{1, 100};
        Boolean[] BOOL_VALS = new Boolean[]{false, true};
        for (int nbRows : NB_ROWS) {
            for (int walEditKVs : NB_KVS) {
                Boolean[] arr$ = BOOL_VALS;
                int len$ = arr$.length;
                for (int i$ = 0; i$ < len$; ++i$) {
                    boolean isCompressionEnabled = arr$[i$];
                    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.wal.enablecompression", isCompressionEnabled);
                    this.mvcc.advanceTo(1L);
                    for (int i = 0; i < nbRows; ++i) {
                        this.appendToLogPlus(walEditKVs);
                    }
                    this.log.rollWriter();
                    try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));){
                        int i = 0;
                        for (WAL.Entry e : entryStream) {
                            Assert.assertNotNull((Object)e);
                            ++i;
                        }
                        Assert.assertEquals((long)nbRows, (long)i);
                        Assert.assertFalse((boolean)entryStream.hasNext());
                    }
                    this.log.close();
                    this.setUp();
                }
            }
        }
    }

    @Test
    public void testAppendsWithRolls() throws Exception {
        long oldPos;
        WAL.Entry entry;
        this.appendToLog();
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));){
            Assert.assertTrue((boolean)entryStream.hasNext());
            entry = entryStream.next();
            Assert.assertNotNull((Object)entry);
            Assert.assertFalse((boolean)entryStream.hasNext());
            try {
                entry = entryStream.next();
                Assert.fail();
            }
            catch (NoSuchElementException noSuchElementException) {
                // empty catch block
            }
            oldPos = entryStream.getPosition();
        }
        this.appendToLog();
        entryStream = new WALEntryStream(this.walQueue, fs, conf, oldPos, new MetricsSource("1"));
        var4_2 = null;
        try {
            entry = entryStream.next();
            Assert.assertNotEquals((long)oldPos, (long)entryStream.getPosition());
            Assert.assertNotNull((Object)entry);
            oldPos = entryStream.getPosition();
        }
        catch (Throwable x2) {
            var4_2 = x2;
            throw x2;
        }
        finally {
            if (entryStream != null) {
                if (var4_2 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable x2) {
                        var4_2.addSuppressed(x2);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
        this.appendToLog();
        this.log.rollWriter();
        this.appendToLog();
        entryStream = new WALEntryStream(this.walQueue, fs, conf, oldPos, new MetricsSource("1"));
        var4_2 = null;
        try {
            entry = entryStream.next();
            Assert.assertNotEquals((long)oldPos, (long)entryStream.getPosition());
            Assert.assertNotNull((Object)entry);
            entry = entryStream.next();
            Assert.assertNotEquals((long)oldPos, (long)entryStream.getPosition());
            Assert.assertNotNull((Object)entry);
            Assert.assertFalse((boolean)entryStream.hasNext());
            oldPos = entryStream.getPosition();
        }
        catch (Throwable throwable) {
            var4_2 = throwable;
            throw throwable;
        }
        finally {
            if (entryStream != null) {
                if (var4_2 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable x2) {
                        var4_2.addSuppressed(x2);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
    }

    @Test
    public void testLogrollWhileStreaming() throws Exception {
        this.appendToLog("1");
        this.appendToLog("2");
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));){
            Assert.assertEquals((Object)"1", (Object)this.getRow(entryStream.next()));
            this.appendToLog("3");
            this.log.rollWriter();
            this.appendToLog("4");
            Assert.assertEquals((Object)"2", (Object)this.getRow(entryStream.next()));
            Assert.assertEquals((long)2L, (long)this.walQueue.size());
            Assert.assertEquals((Object)"3", (Object)this.getRow(entryStream.next()));
            Assert.assertEquals((Object)"4", (Object)this.getRow(entryStream.next()));
            Assert.assertEquals((long)1L, (long)this.walQueue.size());
            Assert.assertFalse((boolean)entryStream.hasNext());
        }
    }

    @Test
    public void testNewEntriesWhileStreaming() throws Exception {
        this.appendToLog("1");
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));){
            entryStream.next();
            this.appendToLog("2");
            this.appendToLog("3");
            Assert.assertFalse((boolean)entryStream.hasNext());
            entryStream.reset();
            Assert.assertEquals((Object)"2", (Object)this.getRow(entryStream.next()));
            Assert.assertEquals((Object)"3", (Object)this.getRow(entryStream.next()));
            Assert.assertFalse((boolean)entryStream.hasNext());
        }
    }

    @Test
    public void testResumeStreamingFromPosition() throws Exception {
        long lastPosition = 0L;
        this.appendToLog("1");
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));){
            entryStream.next();
            this.appendToLog("2");
            this.appendToLog("3");
            lastPosition = entryStream.getPosition();
        }
        entryStream = new WALEntryStream(this.walQueue, fs, conf, lastPosition, new MetricsSource("1"));
        var4_3 = null;
        try {
            Assert.assertEquals((Object)"2", (Object)this.getRow(entryStream.next()));
            Assert.assertEquals((Object)"3", (Object)this.getRow(entryStream.next()));
            Assert.assertFalse((boolean)entryStream.hasNext());
            Assert.assertEquals((long)1L, (long)this.walQueue.size());
        }
        catch (Throwable throwable) {
            var4_3 = throwable;
            throw throwable;
        }
        finally {
            if (entryStream != null) {
                if (var4_3 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable x2) {
                        var4_3.addSuppressed(x2);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
    }

    @Test
    public void testPosition() throws Exception {
        long lastPosition = 0L;
        this.appendEntriesToLog(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, lastPosition, new MetricsSource("1"));){
            entryStream.next();
            lastPosition = entryStream.getPosition();
        }
        entryStream = new WALEntryStream(this.walQueue, fs, conf, lastPosition, new MetricsSource("1"));
        var4_3 = null;
        try {
            Assert.assertNotNull((Object)entryStream.next());
            Assert.assertNotNull((Object)entryStream.next());
            Assert.assertFalse((boolean)entryStream.hasNext());
        }
        catch (Throwable throwable) {
            var4_3 = throwable;
            throw throwable;
        }
        finally {
            if (entryStream != null) {
                if (var4_3 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable x2) {
                        var4_3.addSuppressed(x2);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
    }

    @Test
    public void testEmptyStream() throws Exception {
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));){
            Assert.assertFalse((boolean)entryStream.hasNext());
        }
    }

    @Test
    public void testReplicationSourceWALReaderThread() throws Exception {
        long position;
        this.appendEntriesToLog(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));){
            entryStream.next();
            entryStream.next();
            entryStream.next();
            position = entryStream.getPosition();
        }
        ReplicationSourceManager mockSourceManager = (ReplicationSourceManager)Mockito.mock(ReplicationSourceManager.class);
        ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, this.getQueueInfo(), this.walQueue, 0L, fs, conf, this.getDummyFilter(), new MetricsSource("1"));
        Path walPath = this.walQueue.peek();
        batcher.start();
        ReplicationSourceWALReaderThread.WALEntryBatch entryBatch = batcher.take();
        Assert.assertNotNull((Object)entryBatch);
        Assert.assertEquals((long)3L, (long)entryBatch.getWalEntries().size());
        Assert.assertEquals((long)position, (long)entryBatch.getLastWalPosition());
        Assert.assertEquals((Object)walPath, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)3L, (long)entryBatch.getNbRowKeys());
        this.appendToLog("foo");
        entryBatch = batcher.take();
        Assert.assertEquals((long)1L, (long)entryBatch.getNbEntries());
        Assert.assertEquals((Object)this.getRow((WAL.Entry)entryBatch.getWalEntries().get(0)), (Object)"foo");
    }

    private String getRow(WAL.Entry entry) {
        Cell cell = (Cell)entry.getEdit().getCells().get(0);
        return Bytes.toString((byte[])cell.getRowArray(), (int)cell.getRowOffset(), (int)cell.getRowLength());
    }

    private void appendToLog(String key) throws IOException {
        long txid = this.log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), this.mvcc), this.getWALEdit(key), true);
        this.log.sync(txid);
    }

    private void appendEntriesToLog(int count) throws IOException {
        for (int i = 0; i < count; ++i) {
            this.appendToLog();
        }
    }

    private void appendToLog() throws IOException {
        this.appendToLogPlus(1);
    }

    private void appendToLogPlus(int count) throws IOException {
        long txid = this.log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), this.mvcc), this.getWALEdits(count), true);
        this.log.sync(txid);
    }

    private WALEdit getWALEdits(int count) {
        WALEdit edit = new WALEdit();
        for (int i = 0; i < count; ++i) {
            edit.add((Cell)new KeyValue(Bytes.toBytes((long)System.currentTimeMillis()), family, qualifier, System.currentTimeMillis(), qualifier));
        }
        return edit;
    }

    private WALEdit getWALEdit(String row) {
        WALEdit edit = new WALEdit();
        edit.add((Cell)new KeyValue(Bytes.toBytes((String)row), family, qualifier, System.currentTimeMillis(), qualifier));
        return edit;
    }

    private WALEntryFilter getDummyFilter() {
        return new WALEntryFilter(){

            public WAL.Entry filter(WAL.Entry entry) {
                return entry;
            }
        };
    }

    private ReplicationQueueInfo getQueueInfo() {
        return new ReplicationQueueInfo("1");
    }

    static {
        tableName = TableName.valueOf((String)"tablename");
        family = Bytes.toBytes((String)"column");
        qualifier = Bytes.toBytes((String)"qualifier");
        info = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
        htd = new HTableDescriptor(tableName);
    }

    class PathWatcher
    extends WALActionsListener.Base {
        Path currentPath;

        PathWatcher() {
        }

        public void preLogRoll(Path oldPath, Path newPath) throws IOException {
            TestWALEntryStream.this.walQueue.add(newPath);
            this.currentPath = newPath;
        }
    }
}

