package org.apache.activemq.network;

import com.google.common.collect.Lists;
import java.net.URI;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.network.DynamicNetworkTestSupport;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/network/NetworkDurableRecreationTest.class */
public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
    protected NetworkConnector connector;

    @Test(timeout = 30000)
    public void testDurableConsumer() throws Exception {
        testReceive(this.remoteBroker, this.remoteSession, this.localBroker, this.localSession, new DynamicNetworkTestSupport.ConsumerCreator() { // from class: org.apache.activemq.network.NetworkDurableRecreationTest.1
            @Override // org.apache.activemq.network.DynamicNetworkTestSupport.ConsumerCreator
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.remoteSession.createDurableSubscriber(NetworkDurableRecreationTest.this.included, NetworkDurableRecreationTest.this.subName);
            }
        });
    }

    @Test(timeout = 30000)
    public void testDurableConsumerReverse() throws Exception {
        testReceive(this.localBroker, this.localSession, this.remoteBroker, this.remoteSession, new DynamicNetworkTestSupport.ConsumerCreator() { // from class: org.apache.activemq.network.NetworkDurableRecreationTest.2
            @Override // org.apache.activemq.network.DynamicNetworkTestSupport.ConsumerCreator
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.localSession.createDurableSubscriber(NetworkDurableRecreationTest.this.included, NetworkDurableRecreationTest.this.subName);
            }
        });
    }

    @Test(timeout = 30000)
    public void testDurableAndTopicConsumer() throws Exception {
        testReceive(this.remoteBroker, this.remoteSession, this.localBroker, this.localSession, new DynamicNetworkTestSupport.ConsumerCreator() { // from class: org.apache.activemq.network.NetworkDurableRecreationTest.3
            @Override // org.apache.activemq.network.DynamicNetworkTestSupport.ConsumerCreator
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.remoteSession.createConsumer(NetworkDurableRecreationTest.this.included);
            }
        });
    }

    @Test(timeout = 30000)
    public void testDurableAndTopicConsumerReverse() throws Exception {
        testReceive(this.localBroker, this.localSession, this.remoteBroker, this.remoteSession, new DynamicNetworkTestSupport.ConsumerCreator() { // from class: org.apache.activemq.network.NetworkDurableRecreationTest.4
            @Override // org.apache.activemq.network.DynamicNetworkTestSupport.ConsumerCreator
            public MessageConsumer createConsumer() throws JMSException {
                return NetworkDurableRecreationTest.this.localSession.createConsumer(NetworkDurableRecreationTest.this.included);
            }
        });
    }

    protected void testReceive(BrokerService brokerService, Session session, BrokerService brokerService2, Session session2, DynamicNetworkTestSupport.ConsumerCreator consumerCreator) throws Exception {
        NetworkBridge findDuplexBridge;
        DestinationStatistics destinationStatistics = brokerService2.getDestination(this.included).getDestinationStatistics();
        MessageProducer createProducer = session2.createProducer(this.included);
        TopicSubscriber createDurableSubscriber = session.createDurableSubscriber(this.included, this.subName);
        waitForConsumerCount(destinationStatistics, 1);
        if (brokerService2.getNetworkConnectors().size() > 0) {
            Wait.waitFor(() -> {
                return ((NetworkConnector) brokerService2.getNetworkConnectors().get(0)).activeBridges().size() == 1;
            }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L);
            findDuplexBridge = (NetworkBridge) ((NetworkConnector) brokerService2.getNetworkConnectors().get(0)).activeBridges().iterator().next();
        } else {
            findDuplexBridge = findDuplexBridge(brokerService2.getTransportConnectorByScheme("tcp"));
        }
        assertSubscriptionMapCounts(findDuplexBridge, 2);
        ConnectionContext connectionContext = new ConnectionContext();
        RemoveSubscriptionInfo removeSubscriptionInfo = getRemoveSubscriptionInfo(connectionContext, brokerService);
        createDurableSubscriber.close();
        Thread.sleep(1000L);
        brokerService.getBroker().removeSubscription(connectionContext, removeSubscriptionInfo);
        waitForConsumerCount(destinationStatistics, 0);
        assertSubscriptionMapCounts(findDuplexBridge, 1);
        MessageConsumer createConsumer = consumerCreator.createConsumer();
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(session2.createTextMessage("test"));
        Assert.assertNotNull(createConsumer.receive(5000L));
    }

    @Before
    public void setUp() throws Exception {
        doSetUp(true);
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.network.DynamicNetworkTestSupport
    public void doTearDown() throws Exception {
        if (this.localConnection != null) {
            this.localConnection.close();
        }
        if (this.remoteConnection != null) {
            this.remoteConnection.close();
        }
        if (this.localBroker != null) {
            this.localBroker.stop();
        }
        if (this.remoteBroker != null) {
            this.remoteBroker.stop();
        }
    }

    protected void doSetUp(boolean z) throws Exception {
        this.remoteBroker = createRemoteBroker();
        this.remoteBroker.setDeleteAllMessagesOnStartup(z);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.localBroker = createLocalBroker();
        this.localBroker.setDeleteAllMessagesOnStartup(z);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        this.localConnection = activeMQConnectionFactory.createConnection();
        this.localConnection.setClientID(this.clientId);
        this.localConnection.start();
        this.remoteConnection = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI()).createConnection();
        this.remoteConnection.setClientID(this.clientId);
        this.remoteConnection.start();
        this.included = new ActiveMQTopic(this.testTopicName);
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected BrokerService createLocalBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.setBrokerName("localBroker");
        this.connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)"));
        this.connector.setName("networkConnector");
        this.connector.setDecreaseNetworkConsumerPriority(false);
        this.connector.setConduitSubscriptions(true);
        this.connector.setDuplex(true);
        this.connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQDestination[]{new ActiveMQTopic(this.testTopicName)}));
        this.connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQDestination[]{new ActiveMQTopic(this.excludeTopicName)}));
        brokerService.addNetworkConnector(this.connector);
        brokerService.addConnector(NetworkedSyncTest.broker1URL);
        return brokerService;
    }

    protected BrokerService createRemoteBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.addConnector("tcp://localhost:61617");
        return brokerService;
    }
}
