/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.client.socket;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.client.socket.EndpointConnection;
import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
import org.apache.nifi.remote.protocol.DataPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketClient
extends AbstractSiteToSiteClient {
    private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
    private final EndpointConnectionPool pool;
    private final boolean compress;
    private final String portName;
    private final long penalizationNanos;
    private volatile String portIdentifier;
    private volatile boolean closed = false;

    public SocketClient(SiteToSiteClientConfig config) {
        super(config);
        int commsTimeout = (int)config.getTimeout(TimeUnit.MILLISECONDS);
        this.pool = new EndpointConnectionPool(this.createRemoteDestination(config.getPortIdentifier(), config.getPortName()), commsTimeout, (int)config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(), this.siteInfoProvider, config.getLocalAddress());
        this.compress = config.isUseCompression();
        this.portIdentifier = config.getPortIdentifier();
        this.portName = config.getPortName();
        this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
    }

    @Override
    public boolean isSecure() throws IOException {
        return this.siteInfoProvider.isSecure();
    }

    private String getPortIdentifier(TransferDirection direction) throws IOException {
        String id = this.portIdentifier;
        if (id != null) {
            return id;
        }
        String portId = direction == TransferDirection.SEND ? this.siteInfoProvider.getInputPortIdentifier(this.portName) : this.siteInfoProvider.getOutputPortIdentifier(this.portName);
        if (portId == null) {
            logger.debug("Unable to resolve port [{}] to an identifier", (Object)this.portName);
        } else {
            logger.debug("Resolved port [{}] to identifier [{}]", (Object)this.portName, (Object)portId);
            this.portIdentifier = portId;
        }
        return portId;
    }

    private RemoteDestination createRemoteDestination(final String portId, final String portName) {
        return new RemoteDestination(){

            public String getIdentifier() {
                return portId;
            }

            public String getName() {
                return portName;
            }

            public long getYieldPeriod(TimeUnit timeUnit) {
                return timeUnit.convert(SocketClient.this.penalizationNanos, TimeUnit.NANOSECONDS);
            }

            public boolean isUseCompression() {
                return SocketClient.this.compress;
            }
        };
    }

    @Override
    public Transaction createTransaction(TransferDirection direction) throws IOException {
        Transaction transaction;
        if (this.closed) {
            throw new IllegalStateException("Client is closed");
        }
        String portId = this.getPortIdentifier(direction);
        if (portId == null) {
            throw new IOException("Could not find Port with name '" + this.portName + "' for remote NiFi instance");
        }
        final EndpointConnection connectionState = this.pool.getEndpointConnection(direction, this.getConfig());
        if (connectionState == null) {
            return null;
        }
        try {
            transaction = connectionState.getSocketClientProtocol().startTransaction(connectionState.getPeer(), connectionState.getCodec(), direction);
        }
        catch (Throwable t) {
            this.pool.terminate(connectionState);
            throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
        }
        final AtomicReference<EndpointConnection> connectionStateRef = new AtomicReference<EndpointConnection>(connectionState);
        return new Transaction(){

            @Override
            public void confirm() throws IOException {
                transaction.confirm();
            }

            @Override
            public TransactionCompletion complete() throws IOException {
                try {
                    TransactionCompletion transactionCompletion = transaction.complete();
                    return transactionCompletion;
                }
                finally {
                    EndpointConnection state = (EndpointConnection)connectionStateRef.get();
                    if (state != null) {
                        SocketClient.this.pool.offer(connectionState);
                        connectionStateRef.set(null);
                    }
                }
            }

            @Override
            public void cancel(String explanation) throws IOException {
                try {
                    transaction.cancel(explanation);
                }
                finally {
                    EndpointConnection state = (EndpointConnection)connectionStateRef.get();
                    if (state != null) {
                        SocketClient.this.pool.terminate(connectionState);
                        connectionStateRef.set(null);
                    }
                }
            }

            @Override
            public void error() {
                try {
                    transaction.error();
                }
                finally {
                    EndpointConnection state = (EndpointConnection)connectionStateRef.get();
                    if (state != null) {
                        SocketClient.this.pool.terminate(connectionState);
                        connectionStateRef.set(null);
                    }
                }
            }

            @Override
            public void send(DataPacket dataPacket) throws IOException {
                transaction.send(dataPacket);
            }

            @Override
            public void send(byte[] content, Map<String, String> attributes) throws IOException {
                transaction.send(content, attributes);
            }

            @Override
            public DataPacket receive() throws IOException {
                return transaction.receive();
            }

            @Override
            public Transaction.TransactionState getState() throws IOException {
                return transaction.getState();
            }

            @Override
            public Communicant getCommunicant() {
                return transaction.getCommunicant();
            }
        };
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        this.pool.shutdown();
    }
}

