/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounterTestBase;
import org.apache.flink.runtime.checkpoint.DefaultLastStateConnectionStateListener;
import org.apache.flink.runtime.checkpoint.LastStateConnectionStateListener;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ZooKeeperCheckpointIDCounterITCase
extends CheckpointIDCounterTestBase {
    private static ZooKeeperTestEnvironment zookeeper;

    ZooKeeperCheckpointIDCounterITCase() {
    }

    @BeforeEach
    void setup() {
        zookeeper = new ZooKeeperTestEnvironment(1);
    }

    @AfterEach
    void tearDown() throws Exception {
        this.cleanAndStopZooKeeperIfRunning();
    }

    private void cleanAndStopZooKeeperIfRunning() throws Exception {
        if (zookeeper.getClient().isStarted()) {
            zookeeper.deleteAll();
            zookeeper.shutdown();
        }
    }

    @Test
    public void testShutdownRemovesState() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = zookeeper.getClient();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNotNull();
        counter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNull();
    }

    @Test
    public void testIdempotentShutdown() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = zookeeper.getClient();
        counter.shutdown(JobStatus.FINISHED).join();
        counter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNull();
    }

    @Test
    public void testShutdownWithFailureDueToMissingConnection() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        this.cleanAndStopZooKeeperIfRunning();
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            Void cfr_ignored_0 = (Void)counter.shutdown(JobStatus.FINISHED).get();
        }).as("The shutdown should fail because of the client connection being dropped.", new Object[0])).isInstanceOf(ExecutionException.class)).hasCauseExactlyInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testShutdownWithFailureDueToExistingChildNodes() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = ZooKeeperUtils.useNamespaceAndEnsurePath((CuratorFramework)zookeeper.getClient(), (String)"/");
        String counterNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{counter.getPath()});
        String childNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{counterNodePath, "unexpected-child-node-causing-a-failure"});
        client.create().forPath(childNodePath);
        String namespacedCounterNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{client.getNamespace(), counterNodePath});
        KeeperException expectedRootCause = KeeperException.create((KeeperException.Code)KeeperException.Code.NOTEMPTY, (String)namespacedCounterNodePath);
        ((ListAssert)((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            Void cfr_ignored_0 = (Void)counter.shutdown(JobStatus.FINISHED).get();
        }).as("The shutdown should fail because of a child node being present and the shutdown not performing an explicit recursive deletion.", new Object[0])).isInstanceOf(ExecutionException.class)).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).anySatisfy(arg_0 -> ZooKeeperCheckpointIDCounterITCase.lambda$testShutdownWithFailureDueToExistingChildNodes$2((Throwable)expectedRootCause, arg_0));
        client.delete().forPath(childNodePath);
        counter.shutdown(JobStatus.FINISHED).join();
        ((ObjectAssert)Assertions.assertThat((Object)client.checkExists().forPath(counterNodePath)).as("A retry of the shutdown should have worked now after the root cause was resolved.", new Object[0])).isNull();
    }

    @Test
    public void testSuspendKeepsState() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = zookeeper.getClient();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNotNull();
        counter.shutdown(JobStatus.SUSPENDED).join();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNotNull();
    }

    protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception {
        return new ZooKeeperCheckpointIDCounter(ZooKeeperUtils.useNamespaceAndEnsurePath((CuratorFramework)zookeeper.getClient(), (String)"/"), (LastStateConnectionStateListener)new DefaultLastStateConnectionStateListener());
    }

    private static /* synthetic */ void lambda$testShutdownWithFailureDueToExistingChildNodes$2(Throwable expectedRootCause, Throwable cause) throws Throwable {
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)cause).isInstanceOf(expectedRootCause.getClass())).hasMessage(expectedRootCause.getMessage());
    }
}

