/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;

public class NetworkTestUtils {
    private static final String PROXY_TCP4_TEMPLATE = "PROXY TCP4 %s %s %s %s\r\n";

    public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig serverConfig, CredentialCache credentialCache, Time time) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, serverConfig, credentialCache, 100, time);
    }

    public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig serverConfig, CredentialCache credentialCache, int failedAuthenticationDelayMs, Time time) throws Exception {
        NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost", null, credentialCache, failedAuthenticationDelayMs, time);
        server.start();
        return server;
    }

    public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig serverConfig, CredentialCache credentialCache, int failedAuthenticationDelayMs, Time time, DelegationTokenCache tokenCache) throws Exception {
        NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost", null, credentialCache, failedAuthenticationDelayMs, time, tokenCache, Optional.empty());
        server.start();
        return server;
    }

    public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig serverConfig, CredentialCache credentialCache, Time time, Optional<AuditLogProvider> auditLogProvider) throws Exception {
        NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost", null, credentialCache, time, auditLogProvider);
        server.start();
        return server;
    }

    public static Selector createSelector(ChannelBuilder channelBuilder, Time time) {
        return new Selector(5000L, new Metrics(), time, "MetricGroup", channelBuilder, new LogContext());
    }

    public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception {
        NetworkTestUtils.checkClientConnection(selector, node, minMessageSize, messageCount, "");
    }

    public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount, String proxyHeader) throws Exception {
        NetworkTestUtils.waitForChannelReady(selector, node);
        String prefix = TestUtils.randomString(minMessageSize);
        int requests = 0;
        int responses = 0;
        if (proxyHeader.isEmpty()) {
            selector.send(new NetworkSend(node, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)ByteBuffer.wrap((prefix + "-0").getBytes(StandardCharsets.UTF_8)))));
        } else {
            ByteBuffer header = ByteBuffer.wrap(proxyHeader.getBytes(StandardCharsets.US_ASCII));
            ByteBuffer firstMsg = ByteBuffer.wrap((prefix + "-0").getBytes(StandardCharsets.UTF_8));
            ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
            sizeBuffer.putInt(0, firstMsg.remaining());
            selector.send(new NetworkSend(node, (Send)new ByteBufferSend(new ByteBuffer[]{header, sizeBuffer, firstMsg})));
        }
        ++requests;
        while (responses < messageCount) {
            selector.poll(0L);
            Assertions.assertEquals((int)0, (int)selector.disconnected().size(), (String)("No disconnects should have occurred ." + selector.disconnected()));
            for (NetworkReceive receive : selector.completedReceives()) {
                Assertions.assertEquals((Object)(prefix + "-" + responses), (Object)new String(Utils.toArray((ByteBuffer)receive.payload()), StandardCharsets.UTF_8));
                ++responses;
            }
            for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); ++i, ++requests) {
                selector.send(new NetworkSend(node, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)ByteBuffer.wrap((prefix + "-" + requests).getBytes()))));
            }
        }
    }

    public static String toTcp4ProxyHeader(String clientIp, String proxyIp, int clientPort, int proxyPort) {
        return String.format(PROXY_TCP4_TEMPLATE, clientIp, proxyIp, clientPort, proxyPort);
    }

    public static void waitForChannelConnected(Selector selector, String node) throws IOException {
        int secondsLeft = 30;
        while (selector.channel(node) != null && !selector.channel(node).isConnected() && secondsLeft-- > 0) {
            selector.poll(1000L);
        }
        Assertions.assertNotNull((Object)selector.channel(node));
        Assertions.assertTrue((boolean)selector.channel(node).isConnected(), (String)String.format("Channel %s was not connected after 30 seconds", node));
    }

    public static void waitForChannelReady(Selector selector, String node) throws IOException {
        int secondsLeft = 30;
        while (!selector.isChannelReady(node) && secondsLeft-- > 0) {
            selector.poll(1000L);
        }
        Assertions.assertTrue((boolean)selector.isChannelReady(node), (String)String.format("Channel %s was not ready after 30 seconds", node));
    }

    public static ChannelState waitForChannelClose(Selector selector, String node, ChannelState.State channelState) throws IOException {
        return NetworkTestUtils.waitForChannelClose(selector, node, channelState, 0);
    }

    public static ChannelState waitForChannelClose(Selector selector, String node, ChannelState.State channelState, int delayBetweenPollMs) throws IOException {
        boolean closed = false;
        for (int i = 0; i < 300; ++i) {
            if (delayBetweenPollMs > 0) {
                Utils.sleep((long)delayBetweenPollMs);
            }
            selector.poll(100L);
            if (selector.channel(node) != null || selector.closingChannel(node) != null) continue;
            closed = true;
            break;
        }
        Assertions.assertTrue((boolean)closed, (String)"Channel was not closed by timeout");
        ChannelState finalState = (ChannelState)selector.disconnected().get(node);
        Assertions.assertEquals((Object)channelState, (Object)finalState.state());
        return finalState;
    }

    public static void completeDelayedChannelClose(Selector selector, long currentTimeNanos) {
        selector.completeDelayedChannelClose(currentTimeNanos);
    }

    public static Map<?, ?> delayedClosingChannels(Selector selector) {
        return selector.delayedClosingChannels();
    }

    public static void waitTillServerConnectionWithAuthFailure(Selector serverSelector, Selector clientSelector, String errorMessage, String logMessage, boolean verifyMessagePrefix) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            NetworkTestUtils.poll(clientSelector);
            return !serverSelector.channels().isEmpty();
        }, "Timeout waiting for connection");
        KafkaChannel kafkaChannel = (KafkaChannel)serverSelector.channels().get(0);
        TestUtils.waitForCondition(() -> {
            NetworkTestUtils.poll(clientSelector);
            return kafkaChannel.state().exception() != null;
        }, "Timeout waiting for auth failure");
        AuthenticationException authenticationException = kafkaChannel.state().exception();
        Assertions.assertEquals((Object)errorMessage, (Object)authenticationException.getMessage());
        if (verifyMessagePrefix) {
            Assertions.assertTrue((boolean)authenticationException.logMessage().startsWith(logMessage), (String)("Unexpected logMessage: " + authenticationException.logMessage() + " expectedMessage: " + logMessage));
        } else {
            Assertions.assertEquals((Object)logMessage, (Object)authenticationException.logMessage());
        }
    }

    private static void poll(Selector selector) {
        try {
            selector.poll(50L);
        }
        catch (IOException e) {
            Assertions.fail((String)("Caught unexpected exception " + e));
        }
    }
}

