package org.infinispan.statetransfer;

import jakarta.transaction.SystemException;
import jakarta.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.configuration.Combine;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.data.DelayedMarshallingPojo;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.StateTransferFunctionalTest")
/* loaded from: input_file:org/infinispan/statetransfer/StateTransferFunctionalTest.class */
public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
    public static final String A_B_NAME = "a_b_name";
    public static final String A_C_NAME = "a_c_name";
    public static final String A_D_NAME = "a_d_age";
    public static final String A_B_AGE = "a_b_age";
    public static final String A_C_AGE = "a_c_age";
    public static final String A_D_AGE = "a_d_age";
    public static final String JOE = "JOE";
    public static final String BOB = "BOB";
    public static final String JANE = "JANE";
    protected SerializationContextInitializer sci;
    protected ConfigurationBuilder configurationBuilder;
    protected final String cacheName;
    private volatile int testCount;
    public static final Integer TWENTY = 20;
    public static final Integer FORTY = 40;
    private static final Log log = LogFactory.getLog(StateTransferFunctionalTest.class);

    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferFunctionalTest$DelayTransfer.class */
    public static class DelayTransfer {
        volatile boolean doDelay = false;

        void enableDelay() {
            this.doDelay = true;
        }

        @ProtoField(number = 1, defaultValue = "false")
        public boolean isIgnore() {
            if (!this.doDelay) {
                return false;
            }
            TestingUtil.sleepThread(1000L);
            return false;
        }

        public void setIgnore(boolean z) {
        }
    }

    @AutoProtoSchemaBuilder(includeClasses = {DelayedMarshallingPojo.class, DelayTransfer.class}, schemaFileName = "test.core.StateTransferFunctionalTest.proto", schemaFilePath = "proto/generated", schemaPackageName = "org.infinispan.test.core.StateTransferFunctionalTest", service = false)
    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferFunctionalTest$StateTransferFunctionalSCI.class */
    interface StateTransferFunctionalSCI extends SerializationContextInitializer {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferFunctionalTest$WritingTask.class */
    public static class WritingTask implements Callable<Integer> {
        private final Cache<Object, Object> cache;
        private final boolean tx;
        private volatile boolean stop;
        private TransactionManager tm;

        WritingTask(Cache<Object, Object> cache, boolean z) {
            this.cache = cache;
            this.tx = z;
            if (z) {
                this.tm = TestingUtil.getTransactionManager(cache);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            int i = 0;
            while (!this.stop) {
                try {
                    try {
                        if (this.tx) {
                            this.tm.begin();
                        }
                        this.cache.put("test" + i, Integer.valueOf(i));
                        if (this.tx) {
                            this.tm.commit();
                        }
                        i++;
                        Thread.sleep(1L);
                        if (this.tx && 1 == 0) {
                            try {
                                this.tm.rollback();
                            } catch (SystemException e) {
                                StateTransferFunctionalTest.log.error(e);
                            }
                        }
                    } catch (Exception e2) {
                        StateTransferFunctionalTest.log.errorf(e2, "Error writing key test%s", Integer.valueOf(i));
                        stop();
                        if (this.tx && 0 == 0) {
                            try {
                                this.tm.rollback();
                            } catch (SystemException e3) {
                                StateTransferFunctionalTest.log.error(e3);
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (this.tx && 0 == 0) {
                        try {
                            this.tm.rollback();
                        } catch (SystemException e4) {
                            StateTransferFunctionalTest.log.error(e4);
                        }
                    }
                    throw th;
                }
            }
            return Integer.valueOf(i);
        }

        public void stop() {
            this.stop = true;
        }
    }

    public StateTransferFunctionalTest() {
        this("nbst");
    }

    public StateTransferFunctionalTest(String str) {
        this.testCount = 0;
        this.cacheName = str;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.test.MultipleCacheManagersTest
    public void createCacheManagers() throws Throwable {
        this.sci = new StateTransferFunctionalSCIImpl();
        this.configurationBuilder = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
        this.configurationBuilder.transaction().lockingMode(LockingMode.PESSIMISTIC).useSynchronization(false).recovery().disable();
        this.configurationBuilder.clustering().remoteTimeout(30000L);
        this.configurationBuilder.clustering().stateTransfer().chunkSize(20);
        this.configurationBuilder.locking().useLockStriping(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EmbeddedCacheManager createCacheManager(String str) {
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(this.sci, this.configurationBuilder, new TransportFlags().withMerge(true));
        addClusterEnabledCacheManager.defineConfiguration(str, this.configurationBuilder.build());
        return addClusterEnabledCacheManager;
    }

    public void testInitialStateTransfer(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        Cache<Object, Object> cache = createCacheManager(this.cacheName).getCache(this.cacheName);
        writeInitialData(cache);
        Cache<Object, Object> cache2 = createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache, cache2);
        verifyInitialData(cache2);
        logTestEnd(method);
    }

    public void testInitialStateTransferCacheNotPresent(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        EmbeddedCacheManager createCacheManager = createCacheManager(this.cacheName);
        Cache<Object, Object> cache = createCacheManager.getCache(this.cacheName);
        writeInitialData(cache);
        Cache<Object, Object> cache2 = createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache, cache2);
        verifyInitialData(cache2);
        createCacheManager.defineConfiguration("otherCache", this.configurationBuilder.build());
        createCacheManager.getCache("otherCache");
        logTestEnd(method);
    }

    public void testConcurrentStateTransfer(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        Cache<Object, Object> cache = createCacheManager(this.cacheName).getCache(this.cacheName);
        writeInitialData(cache);
        Cache<Object, Object> cache2 = createCacheManager(this.cacheName).getCache(this.cacheName);
        cache.put("delay", new DelayTransfer());
        TestingUtil.waitForNoRebalance(cache, cache2);
        verifyInitialData(cache2);
        EmbeddedCacheManager createCacheManager = createCacheManager(this.cacheName);
        EmbeddedCacheManager createCacheManager2 = createCacheManager(this.cacheName);
        Future fork = fork(() -> {
            return createCacheManager.getCache(this.cacheName);
        });
        Future fork2 = fork(() -> {
            return createCacheManager2.getCache(this.cacheName);
        });
        fork.get(30L, TimeUnit.SECONDS);
        fork2.get(30L, TimeUnit.SECONDS);
        Cache<Object, Object> cache3 = createCacheManager.getCache(this.cacheName);
        Cache<Object, Object> cache4 = createCacheManager2.getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache, cache2, cache3, cache4);
        TestingUtil.waitForNoRebalance(cache, cache2, cache3, cache4);
        verifyInitialData(cache3);
        verifyInitialData(cache4);
        logTestEnd(method);
    }

    public void testSTWithThirdWritingNonTxCache(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        thirdWritingCacheTest(false);
        logTestEnd(method);
    }

    public void testSTWithThirdWritingTxCache(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        thirdWritingCacheTest(true);
        logTestEnd(method);
    }

    public void testSTWithWritingNonTxThread(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        writingThreadTest(false);
        logTestEnd(method);
    }

    public void testSTWithWritingTxThread(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        writingThreadTest(true);
        logTestEnd(method);
    }

    public void testInitialStateTransferAfterRestart(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        Cache<Object, Object> cache = createCacheManager(this.cacheName).getCache(this.cacheName);
        writeInitialData(cache);
        Cache<Object, Object> cache2 = createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache, cache2);
        verifyInitialData(cache2);
        cache2.stop();
        cache2.start();
        verifyInitialData(cache2);
        logTestEnd(method);
    }

    public void testStateTransferException(Method method) throws InterruptedException, TimeoutException, ExecutionException {
        this.testCount++;
        logTestStart(method);
        Cache<Object, Object> cache = createCacheManager(this.cacheName).getCache(this.cacheName);
        writeInitialData(cache);
        GlobalConfigurationBuilder defaultGlobalConfigurationBuilder = defaultGlobalConfigurationBuilder();
        if (this.sci != null) {
            defaultGlobalConfigurationBuilder.serialization().addContextInitializer(this.sci);
        }
        EmbeddedCacheManager createClusteredCacheManager = TestCacheManagerFactory.createClusteredCacheManager(false, defaultGlobalConfigurationBuilder, new ConfigurationBuilder(), new TransportFlags().withMerge(true));
        amendCacheManagerBeforeStart(createClusteredCacheManager);
        createClusteredCacheManager.start();
        CheckPoint checkPoint = new CheckPoint();
        blockRebalanceStart(createClusteredCacheManager, checkPoint, 2);
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.read(this.configurationBuilder.build(), Combine.DEFAULT).clustering().remoteTimeout(1L, TimeUnit.NANOSECONDS).stateTransfer().timeout(1L, TimeUnit.NANOSECONDS);
        AssertJUnit.assertEquals(1, cache.getAdvancedCache().getDistributionManager().getCacheTopology().getMembers().size());
        createClusteredCacheManager.defineConfiguration(this.cacheName, configurationBuilder.build());
        Future fork = fork(() -> {
            return createClusteredCacheManager.getCache(this.cacheName);
        });
        checkPoint.awaitStrict("rebalance_begin", 10L, TimeUnit.SECONDS);
        Exceptions.expectException(ExecutionException.class, org.infinispan.commons.TimeoutException.class, () -> {
            fork.get(10L, TimeUnit.SECONDS);
        });
        checkPoint.triggerForever("merge");
        eventuallyEquals(1, () -> {
            return Integer.valueOf(cache.getAdvancedCache().getDistributionManager().getCacheTopology().getMembers().size());
        });
    }

    protected void blockRebalanceStart(EmbeddedCacheManager embeddedCacheManager, CheckPoint checkPoint, int i) {
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) Mockito.spy((LocalTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, LocalTopologyManager.class));
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            List members = ((CacheTopology) invocationOnMock.getArguments()[1]).getMembers();
            checkPoint.trigger("rebalance_begin");
            if (members.size() != i) {
                return invocationOnMock.callRealMethod();
            }
            log.debugf("Blocking the REBALANCE_START command with members %s on %s", members, embeddedCacheManager.getAddress());
            return TestingUtil.sequence(checkPoint.future("merge", 30L, TimeUnit.SECONDS, testExecutor()), () -> {
                return (CompletionStage) Mocks.callRealMethod(invocationOnMock);
            });
        }).when(localTopologyManager)).handleRebalance((String) ArgumentMatchers.eq(this.cacheName), (CacheTopology) ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyInt(), (Address) ArgumentMatchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer) embeddedCacheManager, (Class<LocalTopologyManager>) LocalTopologyManager.class, localTopologyManager, true);
    }

    private void logTestStart(Method method) {
        logTestLifecycle(method, "start");
    }

    private void logTestEnd(Method method) {
        logTestLifecycle(method, "end");
    }

    private void logTestLifecycle(Method method, String str) {
        log.infof("%s %s - %s", method.getName(), str, Integer.valueOf(this.testCount));
    }

    private void thirdWritingCacheTest(boolean z) throws Exception {
        Cache<Object, Object> cache = createCacheManager(this.cacheName).getCache(this.cacheName);
        Cache cache2 = createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.blockUntilViewsReceived(60000L, (Cache<?, ?>[]) new Cache[]{cache, cache2});
        writeInitialData(cache);
        DelayTransfer delayTransfer = new DelayTransfer();
        cache.put("delay", delayTransfer);
        delayTransfer.enableDelay();
        WritingTask writingTask = new WritingTask(cache2, z);
        Future fork = fork(writingTask);
        Cache<Object, Object> cache3 = createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache, cache3, cache2);
        writingTask.stop();
        int intValue = ((Integer) fork.get(60L, TimeUnit.SECONDS)).intValue();
        verifyInitialData(cache3);
        for (int i = 0; i < intValue; i++) {
            AssertJUnit.assertEquals(Integer.valueOf(i), cache3.get("test" + i));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyInitialData(Cache<Object, Object> cache) {
        log.debugf("Checking values on cache " + String.valueOf(cache.getAdvancedCache().getRpcManager().getAddress()), new Object[0]);
        AssertJUnit.assertEquals("Incorrect value for key a_b_name", "JOE", cache.get("a_b_name"));
        AssertJUnit.assertEquals("Incorrect value for key a_b_age", TWENTY, cache.get("a_b_age"));
        AssertJUnit.assertEquals("Incorrect value for key a_c_name", "BOB", cache.get("a_c_name"));
        AssertJUnit.assertEquals("Incorrect value for key a_c_age", FORTY, cache.get("a_c_age"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeInitialData(Cache<Object, Object> cache) {
        cache.put("a_b_name", "JOE");
        cache.put("a_b_age", TWENTY);
        cache.put("a_c_name", "BOB");
        cache.put("a_c_age", FORTY);
    }

    private void writingThreadTest(boolean z) throws Exception {
        Cache<Object, Object> cache = createCacheManager(this.cacheName).getCache(this.cacheName);
        AssertJUnit.assertEquals(0, cache.getAdvancedCache().getDataContainer().size());
        writeInitialData(cache);
        DelayTransfer delayTransfer = new DelayTransfer();
        cache.put("delay", delayTransfer);
        delayTransfer.enableDelay();
        WritingTask writingTask = new WritingTask(cache, z);
        Future fork = fork(writingTask);
        verifyInitialData(cache);
        Cache<Object, Object> cache2 = createCacheManager(this.cacheName).getCache(this.cacheName);
        TestingUtil.waitForNoRebalance(cache, cache2);
        writingTask.stop();
        int intValue = ((Integer) fork.get(60L, TimeUnit.SECONDS)).intValue();
        verifyInitialData(cache);
        verifyInitialData(cache2);
        for (int i = 0; i < intValue; i++) {
            AssertJUnit.assertEquals(Integer.valueOf(i), cache2.get("test" + i));
        }
    }
}
