/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AkkaRpcServiceUtils {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
    private static final String AKKA_TCP = "akka.tcp";
    private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
    private static final String SIMPLE_AKKA_CONFIG_TEMPLATE = "akka {remote {netty.tcp {maximum-frame-size = %s}}}";
    private static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
    private static final AtomicLong nextNameOffset = new AtomicLong(0L);

    public static RpcService createRpcService(String hostname, String portRangeDefinition, Configuration configuration) throws Exception {
        ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, LOG);
        return AkkaRpcServiceUtils.instantiateAkkaRpcService(configuration, actorSystem);
    }

    public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
        ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, port, LOG);
        return AkkaRpcServiceUtils.instantiateAkkaRpcService(configuration, actorSystem);
    }

    public static RpcService createRpcService(String hostname, String portRangeDefinition, Configuration configuration, String actorSystemName, @Nonnull BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {
        ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, actorSystemName, hostname, portRangeDefinition, LOG, actorSystemExecutorConfiguration);
        return AkkaRpcServiceUtils.instantiateAkkaRpcService(configuration, actorSystem);
    }

    @Nonnull
    private static RpcService instantiateAkkaRpcService(Configuration configuration, ActorSystem actorSystem) {
        return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, HighAvailabilityServicesUtils.AddressResolution addressResolution, Configuration config) throws UnknownHostException {
        Preconditions.checkNotNull((Object)config, (String)"config is null");
        boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.isInternalSSLEnabled(config);
        return AkkaRpcServiceUtils.getRpcUrl(hostname, port, endpointName, addressResolution, sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, HighAvailabilityServicesUtils.AddressResolution addressResolution, AkkaProtocol akkaProtocol) throws UnknownHostException {
        String protocolPrefix;
        Preconditions.checkNotNull((Object)hostname, (String)"hostname is null");
        Preconditions.checkNotNull((Object)endpointName, (String)"endpointName is null");
        Preconditions.checkArgument((port > 0 && port <= 65535 ? 1 : 0) != 0, (Object)"port must be in [1, 65535]");
        String string = protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
        if (addressResolution == HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION) {
            InetAddress.getByName(hostname);
        }
        String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString((String)hostname, (int)port);
        return String.format("%s://flink@%s/user/%s", protocolPrefix, hostPort, endpointName);
    }

    public static String createRandomName(String prefix) {
        long nameOffset;
        Preconditions.checkNotNull((Object)prefix, (String)"Prefix must not be null.");
        while (!nextNameOffset.compareAndSet(nameOffset = nextNameOffset.get(), nameOffset + 1L)) {
        }
        return prefix + '_' + nameOffset;
    }

    public static long extractMaximumFramesize(Configuration configuration) {
        String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
        String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr);
        Config akkaConfig = ConfigFactory.parseString((String)akkaConfigStr);
        return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH);
    }

    private AkkaRpcServiceUtils() {
    }

    public static enum AkkaProtocol {
        TCP,
        SSL_TCP;

    }
}

