/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.EchoServer;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.PlaintextChannelBuilder;
import org.apache.kafka.common.network.PlaintextSender;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SelectorTest {
    protected static final int BUFFER_SIZE = 4096;
    protected EchoServer server;
    protected Time time;
    protected Selector selector;
    protected ChannelBuilder channelBuilder;
    protected Metrics metrics;

    @Before
    public void setUp() throws Exception {
        HashMap configs = new HashMap();
        this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs);
        this.server.start();
        this.time = new MockTime();
        this.channelBuilder = new PlaintextChannelBuilder();
        this.channelBuilder.configure(configs);
        this.metrics = new Metrics();
        this.selector = new Selector(5000L, this.metrics, this.time, "MetricGroup", this.channelBuilder, new LogContext());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @After
    public void tearDown() throws Exception {
        try {
            this.verifySelectorEmpty();
        }
        finally {
            this.selector.close();
            this.server.close();
            this.metrics.close();
        }
    }

    public SecurityProtocol securityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    @Test
    public void testServerDisconnect() throws Exception {
        String node = "0";
        this.blockingConnect("0");
        Assert.assertEquals((Object)"hello", (Object)this.blockingRequest("0", "hello"));
        KafkaChannel channel = this.selector.channel("0");
        this.server.closeConnections();
        TestUtils.waitForCondition(new TestCondition(){

            @Override
            public boolean conditionMet() {
                try {
                    SelectorTest.this.selector.poll(1000L);
                    return SelectorTest.this.selector.disconnected().containsKey("0");
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }, 5000L, "Failed to observe disconnected node in disconnected set");
        Assert.assertNull((Object)channel.selectionKey().attachment());
        this.blockingConnect("0");
        Assert.assertEquals((Object)"hello", (Object)this.blockingRequest("0", "hello"));
    }

    @Test
    public void testCantSendWithInProgress() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        this.selector.send((Send)this.createSend(node, "test1"));
        try {
            this.selector.send((Send)this.createSend(node, "test2"));
            Assert.fail((String)"IllegalStateException not thrown when sending a request with one in flight");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        this.selector.poll(0L);
        Assert.assertTrue((String)"Channel not closed", (boolean)this.selector.disconnected().containsKey(node));
        Assert.assertEquals((Object)ChannelState.FAILED_SEND, this.selector.disconnected().get(node));
    }

    @Test(expected=IllegalStateException.class)
    public void testCantSendWithoutConnecting() throws Exception {
        this.selector.send((Send)this.createSend("0", "test"));
        this.selector.poll(1000L);
    }

    @Test(expected=IOException.class)
    public void testNoRouteToHost() throws Exception {
        this.selector.connect("0", new InetSocketAddress("some.invalid.hostname.foo.bar.local", this.server.port), 4096, 4096);
    }

    @Test
    public void testConnectionRefused() throws Exception {
        String node = "0";
        ServerSocket nonListeningSocket = new ServerSocket(0);
        int nonListeningPort = nonListeningSocket.getLocalPort();
        this.selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), 4096, 4096);
        while (this.selector.disconnected().containsKey(node)) {
            Assert.assertEquals((Object)ChannelState.NOT_CONNECTED, this.selector.disconnected().get(node));
            this.selector.poll(1000L);
        }
        nonListeningSocket.close();
    }

    @Test
    public void testNormalOperation() throws Exception {
        int conns = 5;
        int reqs = 500;
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
        for (int i = 0; i < conns; ++i) {
            this.connect(Integer.toString(i), addr);
        }
        HashMap<String, Integer> requests = new HashMap<String, Integer>();
        HashMap<String, Integer> responses = new HashMap<String, Integer>();
        int responseCount = 0;
        for (int i = 0; i < conns; ++i) {
            String node = Integer.toString(i);
            this.selector.send((Send)this.createSend(node, node + "-0"));
        }
        while (responseCount < conns * reqs) {
            this.selector.poll(0L);
            Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)this.selector.disconnected().size());
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String[] pieces = this.asString(receive).split("-");
                Assert.assertEquals((String)"Should be in the form 'conn-counter'", (long)2L, (long)pieces.length);
                Assert.assertEquals((String)"Check the source", (Object)receive.source(), (Object)pieces[0]);
                Assert.assertEquals((String)"Check that the receive has kindly been rewound", (long)0L, (long)receive.payload().position());
                if (responses.containsKey(receive.source())) {
                    Assert.assertEquals((String)"Check the request counter", (long)((Integer)responses.get(receive.source())).intValue(), (long)Integer.parseInt(pieces[1]));
                    responses.put(receive.source(), (Integer)responses.get(receive.source()) + 1);
                } else {
                    Assert.assertEquals((String)"Check the request counter", (long)0L, (long)Integer.parseInt(pieces[1]));
                    responses.put(receive.source(), 1);
                }
                ++responseCount;
            }
            for (Send send : this.selector.completedSends()) {
                String dest = send.destination();
                if (requests.containsKey(dest)) {
                    requests.put(dest, (Integer)requests.get(dest) + 1);
                } else {
                    requests.put(dest, 1);
                }
                if ((Integer)requests.get(dest) >= reqs) continue;
                this.selector.send((Send)this.createSend(dest, dest + "-" + requests.get(dest)));
            }
        }
    }

    @Test
    public void testSendLargeRequest() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        String big = TestUtils.randomString(40960);
        Assert.assertEquals((Object)big, (Object)this.blockingRequest(node, big));
    }

    @Test
    public void testLargeMessageSequence() throws Exception {
        int bufferSize = 524288;
        String node = "0";
        int reqs = 50;
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
        this.connect(node, addr);
        String requestPrefix = TestUtils.randomString(bufferSize);
        this.sendAndReceive(node, requestPrefix, 0, reqs);
    }

    @Test
    public void testEmptyRequest() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        Assert.assertEquals((Object)"", (Object)this.blockingRequest(node, ""));
    }

    @Test(expected=IllegalStateException.class)
    public void testExistingConnectionId() throws IOException {
        this.blockingConnect("0");
        this.blockingConnect("0");
    }

    @Test
    public void testMute() throws Exception {
        this.blockingConnect("0");
        this.blockingConnect("1");
        this.selector.send((Send)this.createSend("0", "hello"));
        this.selector.send((Send)this.createSend("1", "hi"));
        this.selector.mute("1");
        while (this.selector.completedReceives().isEmpty()) {
            this.selector.poll(5L);
        }
        Assert.assertEquals((String)"We should have only one response", (long)1L, (long)this.selector.completedReceives().size());
        Assert.assertEquals((String)"The response should not be from the muted node", (Object)"0", (Object)((NetworkReceive)this.selector.completedReceives().get(0)).source());
        this.selector.unmute("1");
        do {
            this.selector.poll(5L);
        } while (this.selector.completedReceives().isEmpty());
        Assert.assertEquals((String)"We should have only one response", (long)1L, (long)this.selector.completedReceives().size());
        Assert.assertEquals((String)"The response should be from the previously muted node", (Object)"1", (Object)((NetworkReceive)this.selector.completedReceives().get(0)).source());
    }

    @Test
    public void registerFailure() throws Exception {
        PlaintextChannelBuilder channelBuilder = new PlaintextChannelBuilder(){

            public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
                throw new RuntimeException("Test exception");
            }

            public void close() {
            }
        };
        Selector selector = new Selector(5000L, new Metrics(), (Time)new MockTime(), "MetricGroup", (ChannelBuilder)channelBuilder, new LogContext());
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        try {
            selector.register("1", socketChannel);
            Assert.fail((String)"Register did not fail");
        }
        catch (IOException e) {
            Assert.assertTrue((String)("Unexpected exception: " + e), (boolean)e.getCause().getMessage().contains("Test exception"));
            Assert.assertFalse((String)"Socket not closed", (boolean)socketChannel.isOpen());
        }
        selector.close();
    }

    @Test
    public void testCloseConnectionInClosingState() throws Exception {
        KafkaChannel channel = this.createConnectionWithStagedReceives(5);
        String id = channel.id();
        this.selector.mute(id);
        this.time.sleep(6000L);
        this.selector.poll(0L);
        Assert.assertNull((String)"Channel not expired", (Object)this.selector.channel(id));
        Assert.assertEquals((Object)channel, (Object)this.selector.closingChannel(id));
        Assert.assertEquals((Object)ChannelState.EXPIRED, (Object)channel.state());
        this.selector.close(id);
        Assert.assertNull((String)"Channel not removed from channels", (Object)this.selector.channel(id));
        Assert.assertNull((String)"Channel not removed from closingChannels", (Object)this.selector.closingChannel(id));
        Assert.assertTrue((String)"Unexpected disconnect notification", (boolean)this.selector.disconnected().isEmpty());
        Assert.assertEquals((Object)ChannelState.EXPIRED, (Object)channel.state());
        Assert.assertNull((Object)channel.selectionKey().attachment());
        this.selector.poll(0L);
        Assert.assertTrue((String)"Unexpected disconnect notification", (boolean)this.selector.disconnected().isEmpty());
    }

    @Test
    public void testCloseOldestConnection() throws Exception {
        String id = "0";
        this.blockingConnect(id);
        this.time.sleep(6000L);
        this.selector.poll(0L);
        Assert.assertTrue((String)"The idle connection should have been closed", (boolean)this.selector.disconnected().containsKey(id));
        Assert.assertEquals((Object)ChannelState.EXPIRED, this.selector.disconnected().get(id));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediatelyConnectedCleaned() throws Exception {
        Metrics metrics = new Metrics();
        Selector selector = new Selector(5000L, metrics, this.time, "MetricGroup", this.channelBuilder, new LogContext()){

            protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException {
                channel.configureBlocking(true);
                boolean connected = super.doConnect(channel, address);
                channel.configureBlocking(false);
                return connected;
            }
        };
        try {
            this.testImmediatelyConnectedCleaned(selector, true);
            this.testImmediatelyConnectedCleaned(selector, false);
        }
        finally {
            selector.close();
            metrics.close();
        }
    }

    private void testImmediatelyConnectedCleaned(Selector selector, boolean closeAfterFirstPoll) throws Exception {
        String id = "0";
        selector.connect(id, new InetSocketAddress("localhost", this.server.port), 4096, 4096);
        this.verifyNonEmptyImmediatelyConnectedKeys(selector);
        if (closeAfterFirstPoll) {
            selector.poll(0L);
            this.verifyEmptyImmediatelyConnectedKeys(selector);
        }
        selector.close(id);
        this.verifySelectorEmpty(selector);
    }

    @Test
    public void testCloseOldestConnectionWithOneStagedReceive() throws Exception {
        this.verifyCloseOldestConnectionWithStagedReceives(1);
    }

    @Test
    public void testCloseOldestConnectionWithMultipleStagedReceives() throws Exception {
        this.verifyCloseOldestConnectionWithStagedReceives(5);
    }

    private KafkaChannel createConnectionWithStagedReceives(int maxStagedReceives) throws Exception {
        String id = "0";
        this.blockingConnect(id);
        KafkaChannel channel = this.selector.channel(id);
        int retries = 100;
        do {
            this.selector.mute(id);
            for (int i = 0; i <= maxStagedReceives; ++i) {
                this.selector.send((Send)this.createSend(id, String.valueOf(i)));
                this.selector.poll(1000L);
            }
            this.selector.unmute(id);
            do {
                this.selector.poll(1000L);
            } while (this.selector.completedReceives().isEmpty());
        } while (this.selector.numStagedReceives(channel) == 0 && --retries > 0);
        Assert.assertTrue((String)"No staged receives after 100 attempts", (this.selector.numStagedReceives(channel) > 0 ? 1 : 0) != 0);
        return channel;
    }

    private void verifyCloseOldestConnectionWithStagedReceives(int maxStagedReceives) throws Exception {
        KafkaChannel channel = this.createConnectionWithStagedReceives(maxStagedReceives);
        String id = channel.id();
        int stagedReceives = this.selector.numStagedReceives(channel);
        int completedReceives = 0;
        while (this.selector.disconnected().isEmpty()) {
            this.time.sleep(6000L);
            this.selector.poll(0L);
            int newStaged = this.selector.numStagedReceives(channel) - (stagedReceives - (completedReceives += this.selector.completedReceives().size()));
            if (newStaged > 0) {
                stagedReceives += newStaged;
                Assert.assertNotNull((String)"Channel should not have been expired", (Object)this.selector.channel(id));
                Assert.assertFalse((String)"Channel should not have been disconnected", (boolean)this.selector.disconnected().containsKey(id));
                continue;
            }
            if (this.selector.completedReceives().isEmpty()) continue;
            Assert.assertEquals((long)1L, (long)this.selector.completedReceives().size());
            Assert.assertTrue((String)"Channel not found", (this.selector.closingChannel(id) != null || this.selector.channel(id) != null ? 1 : 0) != 0);
            Assert.assertFalse((String)"Disconnect notified too early", (boolean)this.selector.disconnected().containsKey(id));
        }
        Assert.assertEquals((long)stagedReceives, (long)completedReceives);
        Assert.assertNull((String)"Channel not removed", (Object)this.selector.channel(id));
        Assert.assertNull((String)"Channel not removed", (Object)this.selector.closingChannel(id));
        Assert.assertTrue((String)"Disconnect not notified", (boolean)this.selector.disconnected().containsKey(id));
        Assert.assertTrue((String)"Unexpected receive", (boolean)this.selector.completedReceives().isEmpty());
    }

    @Test
    public void testMuteOnOOM() throws Exception {
        this.selector.close();
        SimpleMemoryPool pool = new SimpleMemoryPool(900L, 900, false, null);
        this.selector = new Selector(-1, 5000L, this.metrics, this.time, "MetricGroup", new HashMap(), true, false, this.channelBuilder, (MemoryPool)pool, new LogContext());
        try (ServerSocketChannel ss = ServerSocketChannel.open();){
            ss.bind(new InetSocketAddress(0));
            InetSocketAddress serverAddress = (InetSocketAddress)ss.getLocalAddress();
            Thread sender1 = this.createSender(serverAddress, this.randomPayload(900));
            Thread sender2 = this.createSender(serverAddress, this.randomPayload(900));
            sender1.start();
            sender2.start();
            sender1.join(5000L);
            sender2.join(5000L);
            SocketChannel channelX = ss.accept();
            channelX.configureBlocking(false);
            SocketChannel channelY = ss.accept();
            channelY.configureBlocking(false);
            this.selector.register("clientX", channelX);
            this.selector.register("clientY", channelY);
            List completed = Collections.emptyList();
            long deadline = System.currentTimeMillis() + 5000L;
            while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
                this.selector.poll(1000L);
                completed = this.selector.completedReceives();
            }
            Assert.assertEquals((String)"could not read a single request within timeout", (long)1L, (long)completed.size());
            NetworkReceive firstReceive = (NetworkReceive)completed.get(0);
            Assert.assertEquals((long)0L, (long)pool.availableMemory());
            Assert.assertTrue((boolean)this.selector.isOutOfMemory());
            this.selector.poll(10L);
            Assert.assertTrue((boolean)this.selector.completedReceives().isEmpty());
            Assert.assertEquals((long)0L, (long)pool.availableMemory());
            Assert.assertTrue((boolean)this.selector.isOutOfMemory());
            firstReceive.close();
            Assert.assertEquals((long)900L, (long)pool.availableMemory());
            completed = Collections.emptyList();
            deadline = System.currentTimeMillis() + 5000L;
            while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
                this.selector.poll(1000L);
                completed = this.selector.completedReceives();
            }
            Assert.assertEquals((String)"could not read a single request within timeout", (long)1L, (long)this.selector.completedReceives().size());
            Assert.assertEquals((long)0L, (long)pool.availableMemory());
            Assert.assertFalse((boolean)this.selector.isOutOfMemory());
        }
    }

    private Thread createSender(InetSocketAddress serverAddress, byte[] payload) {
        return new PlaintextSender(serverAddress, payload);
    }

    protected byte[] randomPayload(int sizeBytes) throws Exception {
        Random random = new Random();
        byte[] payload = new byte[sizeBytes + 4];
        random.nextBytes(payload);
        ByteArrayOutputStream prefixOs = new ByteArrayOutputStream();
        DataOutputStream prefixDos = new DataOutputStream(prefixOs);
        prefixDos.writeInt(sizeBytes);
        prefixDos.flush();
        prefixDos.close();
        prefixOs.flush();
        prefixOs.close();
        byte[] prefix = prefixOs.toByteArray();
        System.arraycopy(prefix, 0, payload, 0, prefix.length);
        return payload;
    }

    @Test
    public void testConnectDisconnectDuringInSinglePoll() throws Exception {
        IMocksControl control = EasyMock.createControl();
        KafkaChannel kafkaChannel = (KafkaChannel)control.createMock(KafkaChannel.class);
        EasyMock.expect((Object)kafkaChannel.id()).andStubReturn((Object)"1");
        EasyMock.expect((Object)kafkaChannel.socketDescription()).andStubReturn((Object)"");
        EasyMock.expect((Object)kafkaChannel.state()).andStubReturn((Object)ChannelState.NOT_CONNECTED);
        EasyMock.expect((Object)kafkaChannel.finishConnect()).andReturn((Object)true);
        EasyMock.expect((Object)kafkaChannel.isConnected()).andStubReturn((Object)true);
        kafkaChannel.disconnect();
        kafkaChannel.close();
        EasyMock.expect((Object)kafkaChannel.ready()).andReturn((Object)false).anyTimes();
        kafkaChannel.prepare();
        EasyMock.expectLastCall().andThrow((Throwable)new IOException());
        SelectionKey selectionKey = (SelectionKey)control.createMock(SelectionKey.class);
        EasyMock.expect((Object)kafkaChannel.selectionKey()).andStubReturn((Object)selectionKey);
        EasyMock.expect((Object)selectionKey.channel()).andReturn((Object)SocketChannel.open());
        EasyMock.expect((Object)selectionKey.readyOps()).andStubReturn((Object)8);
        selectionKey.cancel();
        EasyMock.expectLastCall();
        control.replay();
        selectionKey.attach(kafkaChannel);
        Set selectionKeys = Utils.mkSet((Object[])new SelectionKey[]{selectionKey});
        this.selector.pollSelectionKeys(selectionKeys, false, System.nanoTime());
        Assert.assertFalse((boolean)this.selector.connected().contains(kafkaChannel.id()));
        Assert.assertTrue((boolean)this.selector.disconnected().containsKey(kafkaChannel.id()));
        Assert.assertNull((Object)selectionKey.attachment());
        control.verify();
    }

    private String blockingRequest(String node, String s) throws IOException {
        NetworkReceive receive;
        this.selector.send((Send)this.createSend(node, s));
        this.selector.poll(1000L);
        block0: while (true) {
            this.selector.poll(1000L);
            Iterator i$ = this.selector.completedReceives().iterator();
            do {
                if (!i$.hasNext()) continue block0;
            } while (!(receive = (NetworkReceive)i$.next()).source().equals(node));
            break;
        }
        return this.asString(receive);
    }

    protected void connect(String node, InetSocketAddress serverAddr) throws IOException {
        this.selector.connect(node, serverAddr, 4096, 4096);
    }

    private void blockingConnect(String node) throws IOException {
        this.blockingConnect(node, new InetSocketAddress("localhost", this.server.port));
    }

    protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
        this.selector.connect(node, serverAddr, 4096, 4096);
        while (!this.selector.connected().contains(node)) {
            this.selector.poll(10000L);
        }
        while (!this.selector.isChannelReady(node)) {
            this.selector.poll(10000L);
        }
    }

    protected NetworkSend createSend(String node, String s) {
        return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
    }

    protected String asString(NetworkReceive receive) {
        return new String(Utils.toArray((ByteBuffer)receive.payload()));
    }

    private void sendAndReceive(String node, String requestPrefix, int startIndex, int endIndex) throws Exception {
        int requests = startIndex;
        int responses = startIndex;
        this.selector.send((Send)this.createSend(node, requestPrefix + "-" + startIndex));
        ++requests;
        while (responses < endIndex) {
            this.selector.poll(0L);
            Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)this.selector.disconnected().size());
            for (NetworkReceive receive : this.selector.completedReceives()) {
                Assert.assertEquals((Object)(requestPrefix + "-" + responses), (Object)this.asString(receive));
                ++responses;
            }
            for (int i = 0; i < this.selector.completedSends().size() && requests < endIndex; ++i, ++requests) {
                this.selector.send((Send)this.createSend(node, requestPrefix + "-" + requests));
            }
        }
    }

    private void verifyNonEmptyImmediatelyConnectedKeys(Selector selector) throws Exception {
        Field field = Selector.class.getDeclaredField("immediatelyConnectedKeys");
        field.setAccessible(true);
        Collection immediatelyConnectedKeys = (Collection)field.get(selector);
        Assert.assertFalse((boolean)immediatelyConnectedKeys.isEmpty());
    }

    private void verifyEmptyImmediatelyConnectedKeys(Selector selector) throws Exception {
        Field field = Selector.class.getDeclaredField("immediatelyConnectedKeys");
        this.ensureEmptySelectorField(selector, field);
    }

    protected void verifySelectorEmpty() throws Exception {
        this.verifySelectorEmpty(this.selector);
    }

    private void verifySelectorEmpty(Selector selector) throws Exception {
        for (KafkaChannel channel : selector.channels()) {
            selector.close(channel.id());
            Assert.assertNull((Object)channel.selectionKey().attachment());
        }
        selector.poll(0L);
        selector.poll(0L);
        for (Field field : Selector.class.getDeclaredFields()) {
            this.ensureEmptySelectorField(selector, field);
        }
    }

    private void ensureEmptySelectorField(Selector selector, Field field) throws Exception {
        field.setAccessible(true);
        Object obj = field.get(selector);
        if (obj instanceof Collection) {
            Assert.assertTrue((String)("Field not empty: " + field + " " + obj), (boolean)((Collection)obj).isEmpty());
        } else if (obj instanceof Map) {
            Assert.assertTrue((String)("Field not empty: " + field + " " + obj), (boolean)((Map)obj).isEmpty());
        }
    }
}

