package org.infinispan.xsite.statetransfer;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.BackupConfigurationBuilder;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.ControlledTransport;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.BackupReceiverDelegator;
import org.infinispan.xsite.XSiteAdminOperations;
import org.infinispan.xsite.commands.XSiteBringOnlineCommand;
import org.infinispan.xsite.commands.XSiteStateTransferCancelSendCommand;
import org.infinispan.xsite.commands.XSiteStateTransferFinishReceiveCommand;
import org.infinispan.xsite.commands.XSiteStateTransferStartReceiveCommand;
import org.infinispan.xsite.commands.XSiteStateTransferStartSendCommand;
import org.infinispan.xsite.commands.XSiteStateTransferStatusRequestCommand;
import org.infinispan.xsite.commands.remote.XSiteStatePushRequest;
import org.infinispan.xsite.commands.remote.XSiteStateTransferControlRequest;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

/* loaded from: input_file:org/infinispan/xsite/statetransfer/BaseStateTransferTest.class */
public abstract class BaseStateTransferTest extends AbstractStateTransferTest {
    private static final String VALUE = "value";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/BaseStateTransferTest$BackupListener.class */
    public static abstract class BackupListener {
        private BackupListener() {
        }

        void beforeCommand(VisitableCommand visitableCommand) throws Exception {
        }

        void afterCommand(VisitableCommand visitableCommand) {
        }

        void beforeState(XSiteState[] xSiteStateArr) throws Exception {
        }

        void afterState(XSiteState[] xSiteStateArr) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/BaseStateTransferTest$ListenableBackupReceiver.class */
    public static class ListenableBackupReceiver extends BackupReceiverDelegator {
        private final BackupListener listener;

        ListenableBackupReceiver(BackupReceiver backupReceiver, BackupListener backupListener) {
            super(backupReceiver);
            this.listener = (BackupListener) Objects.requireNonNull(backupListener, "Listener must not be null.");
        }

        @Override // org.infinispan.xsite.BackupReceiverDelegator
        public <O> CompletionStage<O> handleRemoteCommand(VisitableCommand visitableCommand) {
            try {
                this.listener.beforeCommand(visitableCommand);
                return super.handleRemoteCommand(visitableCommand).whenComplete((obj, th) -> {
                    this.listener.afterCommand(visitableCommand);
                });
            } catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        }

        @Override // org.infinispan.xsite.BackupReceiverDelegator
        public CompletionStage<Void> handleStateTransferState(XSiteState[] xSiteStateArr, long j) {
            try {
                this.listener.beforeState(xSiteStateArr);
                return super.handleStateTransferState(xSiteStateArr, j).whenComplete((r5, th) -> {
                    this.listener.afterState(xSiteStateArr);
                });
            } catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/BaseStateTransferTest$Operation.class */
    public enum Operation {
        PUT("v0", "v1") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.1
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.putAsync(k, finalValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        PUT_IF_ABSENT(null, "v1") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.2
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.putIfAbsentAsync(k, finalValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        PUT_IF_ABSENT_FAIL("v0", "v0") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.3
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.putIfAbsentAsync(k, "v1");
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return false;
            }
        },
        REPLACE("v0", "v1") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.4
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.replaceAsync(k, finalValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        REPLACE_NON_EXISTING(null, null) { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.5
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.replaceAsync(k, "v1");
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return false;
            }
        },
        REPLACE_IF_MATCH("v0", "v1") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.6
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.replaceAsync(k, initialValue(), finalValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        REPLACE_IF_MATCH_FAIL("v0", "v0") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.7
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.replaceAsync(k, "v1", "v1");
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return false;
            }
        },
        REMOVE_NON_EXISTING(null, null) { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.8
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.removeAsync(k);
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        REMOVE("v0", null) { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.9
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.removeAsync(k);
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        REMOVE_IF_MATCH("v0", null) { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.10
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.removeAsync(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        REMOVE_IF_MATCH_FAIL("v0", "v0") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.11
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.removeAsync(k, "v1");
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return false;
            }
        },
        CLEAR("v0", null) { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.12
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                return cache.clearAsync();
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        },
        PUT_MAP("v0", "v1") { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation.13
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> void init(Cache<K, Object> cache, K k) {
                cache.put(k, initialValue());
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public <K> Future<?> perform(Cache<K, Object> cache, K k) {
                HashMap hashMap = new HashMap();
                hashMap.put(k, finalValue());
                return cache.putAllAsync(hashMap);
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.Operation
            public boolean replicates() {
                return true;
            }
        };

        private final Object initialValue;
        private final Object finalValue;

        Operation(Object obj, Object obj2) {
            this.initialValue = obj;
            this.finalValue = obj2;
        }

        final Object initialValue() {
            return this.initialValue;
        }

        final Object finalValue() {
            return this.finalValue;
        }

        protected abstract <K> void init(Cache<K, Object> cache, K k);

        protected abstract <K> Future<?> perform(Cache<K, Object> cache, K k);

        protected abstract boolean replicates();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/BaseStateTransferTest$XSiteStateProviderControl.class */
    public static class XSiteStateProviderControl extends XSiteProviderDelegator {
        private final CheckPoint checkPoint;

        private XSiteStateProviderControl(XSiteStateProvider xSiteStateProvider) {
            super(xSiteStateProvider);
            this.checkPoint = new CheckPoint();
        }

        @Override // org.infinispan.xsite.statetransfer.XSiteProviderDelegator
        public void startStateTransfer(String str, Address address, int i) {
            this.checkPoint.trigger("before-start");
            try {
                this.checkPoint.awaitStrict("await-start", 30L, TimeUnit.SECONDS);
                super.startStateTransfer(str, address, i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (TimeoutException e2) {
                throw new RuntimeException(e2);
            }
        }

        static XSiteStateProviderControl replaceInCache(Cache<?, ?> cache) {
            XSiteStateProviderControl xSiteStateProviderControl = new XSiteStateProviderControl((XSiteStateProvider) TestingUtil.extractComponent(cache, XSiteStateProvider.class));
            TestingUtil.replaceComponent(cache, (Class<? extends XSiteStateProviderControl>) XSiteStateProvider.class, xSiteStateProviderControl, true);
            return xSiteStateProviderControl;
        }

        final void await() throws TimeoutException, InterruptedException {
            this.checkPoint.awaitStrict("before-start", 30L, TimeUnit.SECONDS);
        }

        final void trigger() {
            this.checkPoint.trigger("await-start");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseStateTransferTest() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
        this.cacheMode = CacheMode.DIST_SYNC;
    }

    @Test(groups = {"xsite"})
    public void testStateTransferNonExistingSite() {
        XSiteAdminOperations adminOperations = adminOperations();
        AssertJUnit.assertEquals("Unable to pushState to 'NO_SITE'. Incorrect site name: NO_SITE", adminOperations.pushState("NO_SITE"));
        AssertJUnit.assertTrue(adminOperations.getRunningStateTransfer().isEmpty());
        assertNoStateTransferInSendingSite();
    }

    @Test(groups = {"xsite"})
    public void testCancelStateTransfer(Method method) throws InterruptedException {
        takeSiteOffline();
        assertOffline();
        assertNoStateTransferInReceivingSite(null);
        assertNoStateTransferInSendingSite();
        LocalizedCacheTopology cacheTopology = cache("LON-1", 0).getAdvancedCache().getDistributionManager().getCacheTopology();
        Address address = cache("LON-1", 0).getCacheManager().getAddress();
        HashSet hashSet = new HashSet();
        int i = 0;
        while (hashSet.size() < chunkSize()) {
            String k = TestingUtil.k(method, i);
            cache("LON-1", 0).put(k, "value");
            if (cacheTopology.getDistribution(k).primary().equals(address)) {
                hashSet.add(k);
            }
            i++;
        }
        int i2 = i;
        log.debugf("Coordinator %s is primary owner for %d keys: %s", address, Integer.valueOf(hashSet.size()), hashSet);
        assertInSite("NYC-2", cache -> {
            AssertJUnit.assertTrue(cache.isEmpty());
        });
        ControlledTransport replace = ControlledTransport.replace((Cache<?, ?>) cache("LON-1", 0));
        replace.excludeCommands(XSiteBringOnlineCommand.class, XSiteStateTransferStartReceiveCommand.class, XSiteStateTransferControlRequest.class, XSiteStateTransferStartSendCommand.class, XSiteStateTransferCancelSendCommand.class, XSiteStateTransferFinishReceiveCommand.class, XSiteStateTransferStatusRequestCommand.class);
        replace.excludeCacheCommands();
        startStateTransfer();
        ControlledTransport.BlockedRequest expectCommand = replace.expectCommand(XSiteStatePushRequest.class);
        AssertJUnit.assertEquals("ok", adminOperations().cancelPushState("NYC-2"));
        expectCommand.send().receiveAll();
        assertEventuallyStateTransferNotRunning();
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertEventuallyNoStateTransferInSendingSite();
        AssertJUnit.assertEquals("CANCELED", (String) adminOperations().getPushStateStatus().get("NYC-2"));
        startStateTransfer();
        ControlledTransport.BlockedRequest expectCommand2 = replace.expectCommand(XSiteStatePushRequest.class);
        AssertJUnit.assertEquals("SENDING", (String) adminOperations().getPushStateStatus().get("NYC-2"));
        expectCommand2.send().receiveAll();
        assertEventuallyStateTransferNotRunning();
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertEventuallyNoStateTransferInSendingSite();
        assertInSite("NYC-2", cache2 -> {
            for (int i3 = 0; i3 < i2; i3++) {
                AssertJUnit.assertEquals("value", cache2.get(TestingUtil.k(method, i3)));
            }
        });
        replace.stopBlocking();
    }

    @Test(groups = {"xsite"})
    public void testStateTransferWithClusterIdle(Method method) {
        takeSiteOffline();
        assertOffline();
        assertNoStateTransferInReceivingSite(null);
        assertNoStateTransferInSendingSite();
        int chunkSize = chunkSize() * 4;
        for (int i = 0; i < chunkSize; i++) {
            cache("LON-1", 0).put(TestingUtil.k(method, i), "value");
        }
        assertInSite("NYC-2", cache -> {
            AssertJUnit.assertTrue(cache.isEmpty());
        });
        startStateTransfer();
        assertEventuallyStateTransferNotRunning();
        assertOnline("LON-1", "NYC-2");
        assertInSite("NYC-2", cache2 -> {
            for (int i2 = 0; i2 < chunkSize; i2++) {
                AssertJUnit.assertEquals("value", cache2.get(TestingUtil.k(method, i2)));
            }
        });
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertEventuallyNoStateTransferInSendingSite();
    }

    @Test(groups = {"xsite"})
    public void testPutOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.PUT, true, method);
    }

    @Test(groups = {"xsite"})
    public void testPutOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.PUT, false, method);
    }

    @Test(groups = {"xsite"})
    public void testRemoveOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REMOVE, true, method);
    }

    @Test(groups = {"xsite"})
    public void testRemoveOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REMOVE, false, method);
    }

    @Test(groups = {"xsite"})
    public void testRemoveIfMatchOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REMOVE_IF_MATCH, true, method);
    }

    @Test(groups = {"xsite"})
    public void testRemoveIfMatchOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REMOVE_IF_MATCH, false, method);
    }

    @Test(groups = {"xsite"})
    public void testReplaceOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REPLACE, true, method);
    }

    @Test(groups = {"xsite"})
    public void testReplaceOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REPLACE, false, method);
    }

    @Test(groups = {"xsite"})
    public void testReplaceIfMatchOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REPLACE_IF_MATCH, true, method);
    }

    @Test(groups = {"xsite"})
    public void testReplaceIfMatchOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.REPLACE_IF_MATCH, false, method);
    }

    @Test(groups = {"xsite"})
    public void testClearOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.CLEAR, true, method);
    }

    @Test(groups = {"xsite"})
    public void testClearOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.CLEAR, false, method);
    }

    @Test(groups = {"xsite"})
    public void testPutMapOperationBeforeState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.PUT_MAP, true, method);
    }

    @Test(groups = {"xsite"})
    public void testPutMapOperationAfterState(Method method) throws Exception {
        testStateTransferWithConcurrentOperation(Operation.PUT_MAP, false, method);
    }

    @Test(groups = {"xsite"})
    public void testPutIfAbsentFail(Method method) throws Exception {
        testStateTransferWithNoReplicatedOperation(Operation.PUT_IF_ABSENT_FAIL, method);
    }

    @Test(groups = {"xsite"})
    public void testRemoveIfMatchFail(Method method) throws Exception {
        testStateTransferWithNoReplicatedOperation(Operation.REMOVE_IF_MATCH_FAIL, method);
    }

    @Test(groups = {"xsite"})
    public void testReplaceIfMatchFail(Method method) throws Exception {
        testStateTransferWithNoReplicatedOperation(Operation.REPLACE_IF_MATCH_FAIL, method);
    }

    @Test(groups = {"xsite"})
    public void testPutIfAbsent(Method method) throws Exception {
        testConcurrentOperation(Operation.PUT_IF_ABSENT, method);
    }

    @Test(groups = {"xsite"})
    public void testRemoveNonExisting(Method method) throws Exception {
        testConcurrentOperation(Operation.REMOVE_NON_EXISTING, method);
    }

    @Override // org.infinispan.xsite.AbstractTwoSitesTest
    protected void adaptLONConfiguration(BackupConfigurationBuilder backupConfigurationBuilder) {
        backupConfigurationBuilder.stateTransfer().chunkSize(2).timeout(2000L);
    }

    private void testStateTransferWithConcurrentOperation(final Operation operation, final boolean z, Method method) throws Exception {
        AssertJUnit.assertNotNull(operation);
        AssertJUnit.assertTrue(operation.replicates());
        takeSiteOffline();
        assertOffline();
        assertNoStateTransferInReceivingSite(null);
        assertNoStateTransferInSendingSite();
        final String k = TestingUtil.k(method, 0);
        final CheckPoint checkPoint = new CheckPoint();
        operation.init(cache("LON-1", 0), k);
        AssertJUnit.assertNotNull(operation.initialValue());
        BackupListener backupListener = new BackupListener() { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.1
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void beforeCommand(VisitableCommand visitableCommand) throws Exception {
                checkPoint.trigger("before-update");
                if (z || !BaseStateTransferTest.this.isUpdatingKeyWithValue(visitableCommand, k, operation.finalValue())) {
                    return;
                }
                checkPoint.awaitStrict("update-key", 30L, TimeUnit.SECONDS);
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void afterCommand(VisitableCommand visitableCommand) {
                if (z && BaseStateTransferTest.this.isUpdatingKeyWithValue(visitableCommand, k, operation.finalValue())) {
                    checkPoint.trigger("apply-state");
                }
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void beforeState(XSiteState[] xSiteStateArr) throws Exception {
                checkPoint.trigger("before-state");
                checkPoint.awaitStrict("before-update", 30L, TimeUnit.SECONDS);
                if (z && BaseStateTransferTest.this.containsKey(xSiteStateArr, k)) {
                    checkPoint.awaitStrict("apply-state", 30L, TimeUnit.SECONDS);
                }
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void afterState(XSiteState[] xSiteStateArr) {
                if (z || !BaseStateTransferTest.this.containsKey(xSiteStateArr, k)) {
                    return;
                }
                checkPoint.trigger("update-key");
            }
        };
        Iterator it = caches("NYC-2").iterator();
        while (it.hasNext()) {
            TestingUtil.wrapComponent((Cache) it.next(), BackupReceiver.class, backupReceiver -> {
                return new ListenableBackupReceiver(backupReceiver, backupListener);
            });
        }
        startStateTransfer();
        assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-state", 30L, TimeUnit.SECONDS);
        operation.perform(cache("LON-1", 0), k).get();
        assertEventuallyStateTransferNotRunning();
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertEventuallyNoStateTransferInSendingSite();
        assertInSite("NYC-2", cache -> {
            AssertJUnit.assertEquals(operation.finalValue(), cache.get(k));
        });
        assertInSite("LON-1", cache2 -> {
            AssertJUnit.assertEquals(operation.finalValue(), cache2.get(k));
        });
    }

    private void testConcurrentOperation(Operation operation, Method method) throws Exception {
        AssertJUnit.assertNotNull(operation);
        AssertJUnit.assertTrue(operation.replicates());
        takeSiteOffline();
        assertOffline();
        assertNoStateTransferInReceivingSite(null);
        assertNoStateTransferInSendingSite();
        String k = TestingUtil.k(method, 0);
        operation.init(cache("LON-1", 0), k);
        AssertJUnit.assertNull(operation.initialValue());
        XSiteStateProviderControl replaceInCache = XSiteStateProviderControl.replaceInCache(cache("LON-1", 0));
        Future<Void> fork = fork(this::startStateTransfer);
        replaceInCache.await();
        assertOnline("LON-1", "NYC-2");
        operation.perform(cache("LON-1", 0), k).get();
        replaceInCache.trigger();
        fork.get(30L, TimeUnit.SECONDS);
        assertEventuallyStateTransferNotRunning();
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertEventuallyNoStateTransferInSendingSite();
        assertInSite("NYC-2", cache -> {
            AssertJUnit.assertEquals(operation.finalValue(), cache.get(k));
        });
        assertInSite("LON-1", cache2 -> {
            AssertJUnit.assertEquals(operation.finalValue(), cache2.get(k));
        });
    }

    private void testStateTransferWithNoReplicatedOperation(Operation operation, Method method) throws Exception {
        AssertJUnit.assertNotNull(operation);
        AssertJUnit.assertFalse(operation.replicates());
        takeSiteOffline();
        assertOffline();
        assertNoStateTransferInReceivingSite(null);
        assertNoStateTransferInSendingSite();
        String k = TestingUtil.k(method, 0);
        final CheckPoint checkPoint = new CheckPoint();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        operation.init(cache("LON-1", 0), k);
        AssertJUnit.assertNotNull(operation.initialValue());
        BackupListener backupListener = new BackupListener() { // from class: org.infinispan.xsite.statetransfer.BaseStateTransferTest.2
            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void beforeCommand(VisitableCommand visitableCommand) {
                atomicBoolean.set(true);
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void afterCommand(VisitableCommand visitableCommand) {
                atomicBoolean.set(true);
            }

            @Override // org.infinispan.xsite.statetransfer.BaseStateTransferTest.BackupListener
            public void beforeState(XSiteState[] xSiteStateArr) throws Exception {
                checkPoint.trigger("before-state");
                checkPoint.awaitStrict("before-update", 30L, TimeUnit.SECONDS);
            }
        };
        Iterator it = caches("NYC-2").iterator();
        while (it.hasNext()) {
            TestingUtil.wrapComponent((Cache) it.next(), BackupReceiver.class, backupReceiver -> {
                return new ListenableBackupReceiver(backupReceiver, backupListener);
            });
        }
        startStateTransfer();
        assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-state", 30L, TimeUnit.SECONDS);
        operation.perform(cache("LON-1", 0), k).get();
        AssertJUnit.assertFalse(atomicBoolean.get());
        checkPoint.trigger("before-update");
        assertEventuallyStateTransferNotRunning();
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertEventuallyNoStateTransferInSendingSite();
        assertInSite("NYC-2", cache -> {
            AssertJUnit.assertEquals(operation.finalValue(), cache.get(k));
        });
        assertInSite("LON-1", cache2 -> {
            AssertJUnit.assertEquals(operation.finalValue(), cache2.get(k));
        });
    }

    private boolean isUpdatingKeyWithValue(VisitableCommand visitableCommand, Object obj, Object obj2) {
        if (visitableCommand instanceof PutKeyValueCommand) {
            return obj.equals(((PutKeyValueCommand) visitableCommand).getKey()) && obj2.equals(((PutKeyValueCommand) visitableCommand).getValue());
        }
        if (visitableCommand instanceof RemoveCommand) {
            return obj.equals(((RemoveCommand) visitableCommand).getKey());
        }
        if (visitableCommand instanceof ClearCommand) {
            return true;
        }
        if (visitableCommand instanceof WriteOnlyManyEntriesCommand) {
            return Objects.equals(((InternalCacheValue) ((WriteOnlyManyEntriesCommand) visitableCommand).getArguments().get(obj)).getValue(), obj2);
        }
        if (!(visitableCommand instanceof PrepareCommand)) {
            return false;
        }
        Iterator it = ((PrepareCommand) visitableCommand).getModifications().iterator();
        while (it.hasNext()) {
            if (isUpdatingKeyWithValue((WriteCommand) it.next(), obj, obj2)) {
                return true;
            }
        }
        return false;
    }

    private boolean containsKey(XSiteState[] xSiteStateArr, Object obj) {
        if (xSiteStateArr == null || xSiteStateArr.length == 0 || obj == null) {
            return false;
        }
        for (XSiteState xSiteState : xSiteStateArr) {
            if (obj.equals(xSiteState.key())) {
                return true;
            }
        }
        return false;
    }
}
