/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.activemq.broker.BrokerRestartTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RecoveryStatsBrokerTest
extends BrokerRestartTestSupport {
    private RestartType restartType;

    @Override
    protected void configureBroker(BrokerService broker) throws Exception {
        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
        persistenceAdapter.setJournalMaxFileLength(20480);
        persistenceAdapter.setDirectory(broker.getBrokerDataDirectory());
        broker.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
        broker.setDestinationPolicy(this.policyMap);
    }

    protected void restartBroker(RestartType restartType) throws Exception {
        if (restartType == RestartType.FULL_RECOVERY) {
            this.stopBroker();
            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
            File dir = kahaDBPersistenceAdapter.getDirectory();
            if (dir != null) {
                IOHelper.deleteFile((File)new File(dir, "db.data"));
            }
            this.broker.start();
        } else if (restartType == RestartType.UNCLEAN_SHUTDOWN) {
            File dir = this.broker.getBrokerDataDirectory();
            File backUpDir = new File(dir, "bk");
            IOHelper.mkdirs((File)new File(dir, "bk"));
            for (File f : dir.listFiles()) {
                if (f.isDirectory()) continue;
                IOHelper.copyFile((File)f, (File)new File(backUpDir, f.getName()));
            }
            this.stopBroker();
            for (File f : backUpDir.listFiles()) {
                if (f.isDirectory()) continue;
                IOHelper.copyFile((File)f, (File)new File(dir, f.getName()));
            }
            this.broker.start();
        } else {
            this.restartBroker();
        }
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList({RestartType.NORMAL}, {RestartType.FULL_RECOVERY}, {RestartType.UNCLEAN_SHUTDOWN});
    }

    public RecoveryStatsBrokerTest(RestartType restartType) {
        this.restartType = restartType;
    }

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override
    @After
    public void tearDown() throws Exception {
        super.tearDown();
    }

    @Test(timeout=60000L)
    public void testStaticsRecovery() throws Exception {
        ImmutableList destinations = ImmutableList.of((Object)new ActiveMQQueue("TEST.A"), (Object)new ActiveMQQueue("TEST.B"));
        Random random = new Random();
        HashMap<ActiveMQDestination, Integer> consumedMessages = new HashMap<ActiveMQDestination, Integer>();
        destinations.forEach(destination -> consumedMessages.put((ActiveMQDestination)destination, 0));
        int numberOfMessages = 400;
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        for (int i = 0; i < numberOfMessages; ++i) {
            for (ActiveMQDestination destination2 : destinations) {
                Message message = this.createMessage(producerInfo, destination2);
                message.setPersistent(true);
                message.setProducerId(message.getMessageId().getProducerId());
                connection.request((Command)message);
            }
        }
        Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics = this.getCurrentStatistics((List<ActiveMQDestination>)destinations);
        this.checkStatistics((List<ActiveMQDestination>)destinations, originalStatistics);
        this.restartBroker(this.restartType);
        this.checkStatistics((List<ActiveMQDestination>)destinations, originalStatistics);
        for (ActiveMQDestination destination2 : destinations) {
            this.consume(destination2, 100, false);
        }
        this.checkStatistics((List<ActiveMQDestination>)destinations, originalStatistics);
        this.restartBroker(this.restartType);
        this.checkStatistics((List<ActiveMQDestination>)destinations, originalStatistics);
        for (ActiveMQDestination destination2 : destinations) {
            int messagesToConsume = random.nextInt(numberOfMessages);
            this.consume(destination2, messagesToConsume, true);
            consumedMessages.compute(destination2, (key, value) -> {
                value = value + messagesToConsume;
                return value;
            });
        }
        originalStatistics = this.getCurrentStatistics((List<ActiveMQDestination>)destinations);
        for (ActiveMQDestination destination2 : destinations) {
            int consumedCount = (Integer)consumedMessages.get(destination2);
            RecoveryStatsBrokerTest.assertEquals((String)"", (long)(numberOfMessages - consumedCount), (long)originalStatistics.get(destination2).getMessageCount().getCount());
        }
        this.checkStatistics((List<ActiveMQDestination>)destinations, originalStatistics);
        this.restartBroker(this.restartType);
        this.checkStatistics((List<ActiveMQDestination>)destinations, originalStatistics);
    }

    private Map<ActiveMQDestination, MessageStoreStatistics> getCurrentStatistics(List<ActiveMQDestination> destinations) {
        return destinations.stream().map(destination -> this.getDestination(this.broker, (ActiveMQDestination)destination)).collect(Collectors.toMap(destination -> new ActiveMQQueue(destination.getName()), destination2 -> destination2.getMessageStore().getMessageStoreStatistics()));
    }

    private void consume(ActiveMQDestination destination, int numberOfMessages, boolean shouldAck) throws Exception {
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i = 0; i < numberOfMessages; ++i) {
            Message m2 = this.receiveMessage(connection);
            RecoveryStatsBrokerTest.assertNotNull((Object)m2);
            if (!shouldAck) continue;
            MessageAck ack = this.createAck(consumerInfo, m2, 1, (byte)2);
            connection.request((Command)ack);
        }
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
    }

    private void checkStatistics(List<ActiveMQDestination> destinations, Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics) {
        for (ActiveMQDestination destination : destinations) {
            MessageStoreStatistics original = originalStatistics.get(destination);
            MessageStoreStatistics actual = this.getDestination(this.broker, destination).getMessageStore().getMessageStoreStatistics();
            RecoveryStatsBrokerTest.assertEquals((String)"Have Same Count", (long)original.getMessageCount().getCount(), (long)actual.getMessageCount().getCount());
            RecoveryStatsBrokerTest.assertEquals((String)"Have Same TotalSize", (long)original.getMessageSize().getTotalSize(), (long)this.getDestination(this.broker, destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
        }
    }

    protected Destination getDestination(BrokerService target, ActiveMQDestination destination) {
        RegionBroker regionBroker = (RegionBroker)target.getRegionBroker();
        if (destination.isTemporary()) {
            return destination.isQueue() ? (Destination)regionBroker.getTempQueueRegion().getDestinationMap().get(destination) : (Destination)regionBroker.getTempTopicRegion().getDestinationMap().get(destination);
        }
        return destination.isQueue() ? (Destination)regionBroker.getQueueRegion().getDestinationMap().get(destination) : (Destination)regionBroker.getTopicRegion().getDestinationMap().get(destination);
    }

    static enum RestartType {
        NORMAL,
        FULL_RECOVERY,
        UNCLEAN_SHUTDOWN;

    }
}

