/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SystemProcessingTimeServiceTest
extends TestLogger {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testScheduleAtFixedRate() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        long period = 10L;
        int countDown = 3;
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        try {
            timer.scheduleAtFixedRate(new ProcessingTimeService.ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    countDownLatch.countDown();
                }
            }, 0L, 10L);
            countDownLatch.await();
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        long period = 10L;
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        try {
            ScheduledFuture scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeService.ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                }
            }, 0L, 10L);
            Assert.assertFalse((boolean)scheduledFuture.isDone());
            timer.quiesce().get();
            try {
                scheduledFuture.get();
                Assert.fail((String)"scheduled future is not cancelled");
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
            scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeService.ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    throw new Exception("Test exception.");
                }
            }, 0L, 100L);
            Assert.assertNotNull((Object)scheduledFuture);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediateShutdown() throws Exception {
        CompletableFuture<Throwable> errorFuture = new CompletableFuture<Throwable>();
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorFuture);
        try {
            Assert.assertFalse((boolean)timer.isTerminated());
            OneShotLatch latch = new OneShotLatch();
            timer.registerTimer(System.currentTimeMillis(), timestamp -> {
                latch.trigger();
                Thread.sleep(100000000L);
            });
            latch.await();
            timer.shutdownService();
            Assert.assertTrue((boolean)timer.isTerminated());
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            try {
                timer.registerTimer(System.currentTimeMillis() + 1000L, timestamp -> Assert.fail((String)"should not be called"));
                Assert.fail((String)"should result in an exception");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            try {
                timer.scheduleAtFixedRate(timestamp -> Assert.fail((String)"should not be called"), 0L, 100L);
                Assert.fail((String)"should result in an exception");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertThat((Object)errorFuture.get(30L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(InterruptedException.class));
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQuiescing() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        try {
            final OneShotLatch latch = new OneShotLatch();
            final ReentrantLock scopeLock = new ReentrantLock();
            timer.registerTimer(timer.getCurrentProcessingTime() + 20L, new ProcessingTimeService.ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    scopeLock.lock();
                    try {
                        latch.trigger();
                        Thread.sleep(5L);
                    }
                    finally {
                        scopeLock.unlock();
                    }
                }
            });
            latch.await();
            timer.quiesce().get();
            Assert.assertTrue((boolean)scopeLock.tryLock());
            ScheduledFuture future = timer.registerTimer(timer.getCurrentProcessingTime() - 5L, new ProcessingTimeService.ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    throw new Exception("test");
                }
            });
            Assert.assertNotNull((Object)future);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFutureCancellation() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        try {
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            ScheduledFuture future = timer.registerTimer(System.currentTimeMillis() + 100000000L, timestamp -> {});
            Assert.assertEquals((long)1L, (long)timer.getNumTasksScheduled());
            future.cancel(false);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            future = timer.scheduleAtFixedRate(timestamp -> {}, 10000000000L, 50L);
            Assert.assertEquals((long)1L, (long)timer.getNumTasksScheduled());
            future.cancel(false);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    @Test
    public void testShutdownAndWaitPending() {
        OneShotLatch blockUntilTriggered = new OneShotLatch();
        AtomicBoolean timerExecutionFinished = new AtomicBoolean(false);
        SystemProcessingTimeService timeService = SystemProcessingTimeServiceTest.createBlockingSystemProcessingTimeService(blockUntilTriggered, timerExecutionFinished);
        Assert.assertFalse((boolean)timeService.isTerminated());
        try {
            Assert.assertFalse((boolean)timeService.shutdownAndAwaitPending(1L, TimeUnit.SECONDS));
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Unexpected interruption.");
        }
        blockUntilTriggered.trigger();
        try {
            Assert.assertTrue((boolean)timeService.shutdownAndAwaitPending(60L, TimeUnit.SECONDS));
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Unexpected interruption.");
        }
        Assert.assertTrue((boolean)timerExecutionFinished.get());
        Assert.assertTrue((boolean)timeService.isTerminated());
    }

    @Test
    public void testShutdownServiceUninterruptible() {
        OneShotLatch blockUntilTriggered = new OneShotLatch();
        AtomicBoolean timerFinished = new AtomicBoolean(false);
        SystemProcessingTimeService timeService = SystemProcessingTimeServiceTest.createBlockingSystemProcessingTimeService(blockUntilTriggered, timerFinished);
        Assert.assertFalse((boolean)timeService.isTerminated());
        Thread interruptTarget = Thread.currentThread();
        AtomicBoolean runInterrupts = new AtomicBoolean(true);
        Thread interruptCallerThread = new Thread(() -> {
            while (runInterrupts.get()) {
                interruptTarget.interrupt();
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {}
            }
        });
        interruptCallerThread.start();
        long timeoutMs = 50L;
        long startTime = System.nanoTime();
        Assert.assertFalse((boolean)timeService.isTerminated());
        Assert.assertFalse((boolean)timeService.shutdownServiceUninterruptible(50L));
        Assert.assertTrue((boolean)timeService.isTerminated());
        Assert.assertFalse((boolean)timerFinished.get());
        Assert.assertTrue((System.nanoTime() - startTime >= 50000000L ? 1 : 0) != 0);
        runInterrupts.set(false);
        do {
            try {
                interruptCallerThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (interruptCallerThread.isAlive());
        boolean ignored = Thread.interrupted();
        blockUntilTriggered.trigger();
        Assert.assertTrue((boolean)timeService.shutdownServiceUninterruptible(50L));
        Assert.assertTrue((boolean)timerFinished.get());
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(CompletableFuture<Throwable> errorFuture) {
        Preconditions.checkArgument((!errorFuture.isDone() ? 1 : 0) != 0);
        return new SystemProcessingTimeService(errorFuture::complete);
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(AtomicReference<Throwable> errorRef) {
        Preconditions.checkArgument((errorRef.get() == null ? 1 : 0) != 0);
        return new SystemProcessingTimeService(ex -> errorRef.compareAndSet(null, ex));
    }

    private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(OneShotLatch blockUntilTriggered, AtomicBoolean check) {
        OneShotLatch waitUntilTimerStarted = new OneShotLatch();
        Preconditions.checkState((!check.get() ? 1 : 0) != 0);
        SystemProcessingTimeService timeService = new SystemProcessingTimeService(exception -> {});
        timeService.scheduleAtFixedRate(timestamp -> {
            waitUntilTimerStarted.trigger();
            boolean unblocked = false;
            while (!unblocked) {
                try {
                    blockUntilTriggered.await();
                    unblocked = true;
                }
                catch (InterruptedException interruptedException) {}
            }
            check.set(true);
        }, 0L, 10L);
        try {
            waitUntilTimerStarted.await();
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Problem while starting up service.");
        }
        return timeService;
    }
}

