/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.nailgun;

import com.facebook.nailgun.CommandContext;
import com.facebook.nailgun.NGClientDisconnectReason;
import com.facebook.nailgun.NGClientListener;
import com.facebook.nailgun.NGHeartbeatListener;
import com.facebook.nailgun.NGOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

public class NGCommunicator
implements Closeable {
    private static final Logger LOG = Logger.getLogger(NGCommunicator.class.getName());
    private final ExecutorService orchestratorExecutor;
    private final ExecutorService readExecutor;
    private final Socket socket;
    private final DataInputStream in;
    private final DataOutputStream out;
    private final Object readLock = new Object();
    private final Object writeLock = new Object();
    private final Object orchestratorEvent = new Object();
    private boolean shutdown = false;
    private InputStream stdin = null;
    private boolean eof = false;
    private boolean closed = false;
    private boolean inClosed = false;
    private boolean outClosed = false;
    private boolean isExited = false;
    private int remaining = 0;
    private AtomicBoolean clientConnected = new AtomicBoolean(true);
    private final Set<NGClientListener> clientListeners = new HashSet<NGClientListener>();
    private final Set<NGHeartbeatListener> heartbeatListeners = new HashSet<NGHeartbeatListener>();
    private static final long TERMINATION_TIMEOUT_MS = 1000L;
    private final int heartbeatTimeoutMillis;

    NGCommunicator(Socket socket, int heartbeatTimeoutMillis) throws IOException {
        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
        this.socket = socket;
        this.in = new DataInputStream(socket.getInputStream());
        this.out = new DataOutputStream(socket.getOutputStream());
        Thread mainThread = Thread.currentThread();
        final class NamedThreadFactory
        implements ThreadFactory {
            private final String threadName;

            public NamedThreadFactory(String threadName) {
                this.threadName = threadName;
            }

            @Override
            public Thread newThread(Runnable r) {
                SecurityManager s = System.getSecurityManager();
                ThreadGroup group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
                Thread t = new Thread(group, r, this.threadName, 0L);
                if (t.isDaemon()) {
                    t.setDaemon(false);
                }
                if (t.getPriority() != 10) {
                    t.setPriority(10);
                }
                return t;
            }
        }
        this.orchestratorExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(mainThread.getName() + " (NGCommunicator orchestrator)"));
        this.readExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(mainThread.getName() + " (NGCommunicator reader)"));
    }

    CommandContext readCommandContext() throws IOException {
        ArrayList<String> remoteArgs = new ArrayList<String>();
        Properties remoteEnv = new Properties();
        String cwd = null;
        String command = null;
        while (command == null) {
            int bytesToRead = this.in.readInt();
            byte chunkType = this.in.readByte();
            byte[] b = new byte[bytesToRead];
            this.in.readFully(b);
            String line = new String(b, "UTF-8");
            switch (chunkType) {
                case 65: {
                    remoteArgs.add(line);
                    break;
                }
                case 69: {
                    int equalsIndex = line.indexOf(61);
                    if (equalsIndex <= 0) break;
                    remoteEnv.setProperty(line.substring(0, equalsIndex), line.substring(equalsIndex + 1));
                    break;
                }
                case 67: {
                    command = line;
                    break;
                }
                case 68: {
                    cwd = line;
                    break;
                }
            }
        }
        this.startBackgroundReceive();
        return new CommandContext(command, cwd, remoteEnv, remoteArgs);
    }

    private void startBackgroundReceive() {
        long futureTimeout = this.heartbeatTimeoutMillis + this.heartbeatTimeoutMillis / 10;
        this.orchestratorExecutor.submit(() -> {
            NGClientDisconnectReason reason = NGClientDisconnectReason.INTERNAL_ERROR;
            try {
                LOG.log(Level.FINE, "Orchestrator thread started");
                while (true) {
                    Future<Byte> readFuture;
                    Object object = this.orchestratorEvent;
                    synchronized (object) {
                        if (this.shutdown) {
                            break;
                        }
                        readFuture = this.readExecutor.submit(() -> {
                            try {
                                return this.readChunk();
                            }
                            catch (IOException e) {
                                throw new ExecutionException(e);
                            }
                        });
                    }
                    byte chunkType = futureTimeout > 0L ? readFuture.get(futureTimeout, TimeUnit.MILLISECONDS).byteValue() : readFuture.get().byteValue();
                    if (chunkType != 72) continue;
                    this.notifyHeartbeat();
                }
            }
            catch (InterruptedException e) {
                LOG.log(Level.WARNING, "NGCommunicator orchestrator was interrupted", e);
            }
            catch (ExecutionException e) {
                Throwable cause = NGCommunicator.getCause(e);
                if (cause instanceof EOFException) {
                    LOG.log(Level.FINE, "Socket is disconnected");
                    reason = NGClientDisconnectReason.SOCKET_ERROR;
                } else if (cause instanceof SocketTimeoutException) {
                    reason = NGClientDisconnectReason.SOCKET_TIMEOUT;
                    LOG.log(Level.WARNING, "Nailgun client socket timed out after " + this.heartbeatTimeoutMillis + " ms", cause);
                } else {
                    LOG.log(Level.WARNING, "Nailgun client read future raised an exception", cause);
                }
            }
            catch (TimeoutException e) {
                reason = NGClientDisconnectReason.HEARTBEAT;
                LOG.log(Level.WARNING, "Nailgun client read future timed out after " + futureTimeout + " ms", e);
            }
            catch (Throwable e) {
                LOG.log(Level.WARNING, "Nailgun orchestrator gets an exception ", e);
            }
            LOG.log(Level.FINE, "Nailgun client disconnected");
            this.clientConnected.set(false);
            this.setEof();
            this.waitTerminationAndNotifyClients(reason);
            LOG.log(Level.FINE, "Orchestrator thread finished");
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitTerminationAndNotifyClients(NGClientDisconnectReason reason) {
        while (true) {
            ArrayList<NGClientListener> listeners = new ArrayList<NGClientListener>();
            Object object = this.orchestratorEvent;
            synchronized (object) {
                if (this.shutdown) {
                    reason = NGClientDisconnectReason.SESSION_SHUTDOWN;
                }
                if (!this.clientListeners.isEmpty()) {
                    listeners.addAll(this.clientListeners);
                    this.clientListeners.clear();
                }
            }
            for (NGClientListener listener : listeners) {
                listener.clientDisconnected(reason);
            }
            object = this.orchestratorEvent;
            synchronized (object) {
                if (!this.clientListeners.isEmpty()) {
                    continue;
                }
                if (this.shutdown) {
                    return;
                }
                try {
                    this.orchestratorEvent.wait();
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    private static Throwable getCause(Throwable e) {
        Throwable cause = e.getCause();
        if (cause == null) {
            return e;
        }
        if (cause instanceof ExecutionException) {
            return NGCommunicator.getCause(cause);
        }
        return cause;
    }

    void exit(int exitCode) {
        if (this.isExited) {
            return;
        }
        try {
            this.stopIn();
        }
        catch (IOException ex) {
            LOG.log(Level.WARNING, "Unable to close socket for reading while sending final exit code", ex);
        }
        try (PrintStream exit = new PrintStream(new NGOutputStream(this, 88));){
            exit.println(exitCode);
        }
        this.isExited = true;
        try {
            this.stopOut();
        }
        catch (IOException ex) {
            LOG.log(Level.WARNING, "Unable to close socket for writing while sending final exit code", ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopIn() throws IOException {
        if (this.inClosed) {
            return;
        }
        this.inClosed = true;
        LOG.log(Level.FINE, "Shutting down socket for input");
        this.setEof();
        Object object = this.orchestratorEvent;
        synchronized (object) {
            this.shutdown = true;
            this.orchestratorEvent.notifyAll();
        }
        this.socket.shutdownInput();
    }

    private void stopOut() throws IOException {
        if (this.outClosed) {
            return;
        }
        this.outClosed = true;
        LOG.log(Level.FINE, "Shutting down socket for output");
        this.socket.shutdownOutput();
    }

    @Override
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.stopIn();
        this.stopOut();
        this.in.close();
        this.out.close();
        NGCommunicator.terminateExecutor(this.readExecutor, "read");
        NGCommunicator.terminateExecutor(this.orchestratorExecutor, "orchestrator");
        this.socket.close();
    }

    private static void terminateExecutor(ExecutorService service, String which) {
        boolean terminated;
        LOG.log(Level.FINE, "Shutting down {0} ExecutorService", which);
        service.shutdown();
        try {
            terminated = service.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.log(Level.WARNING, "Interruption is signaled in close(), terminating a thread forcefully");
            service.shutdownNow();
            return;
        }
        if (!terminated) {
            LOG.log(Level.WARNING, "{0} thread did not unblock on a signal within timeout and will be forcefully terminated", which);
            service.shutdownNow();
        }
    }

    private InputStream readPayload(InputStream in, int len) throws IOException {
        int currentRead;
        byte[] receiveBuffer = new byte[len];
        for (int totalRead = 0; totalRead < len; totalRead += currentRead) {
            currentRead = in.read(receiveBuffer, totalRead, len - totalRead);
            if (currentRead >= 0) continue;
            throw new EOFException("stdin EOF before payload read.");
        }
        return new ByteArrayInputStream(receiveBuffer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte readChunk() throws IOException {
        try {
            return this.readChunkImpl();
        }
        catch (SocketException ex) {
            Object object = this.orchestratorEvent;
            synchronized (object) {
                if (this.shutdown) {
                    EOFException newException = new EOFException("NGCommunicator is shutting down");
                    newException.initCause(ex);
                    throw newException;
                }
            }
            throw ex;
        }
    }

    private byte readChunkImpl() throws IOException {
        int chunkLen = this.in.readInt();
        byte chunkType = this.in.readByte();
        switch (chunkType) {
            case 48: {
                LOG.log(Level.FINEST, "Got stdin chunk, len {0}", chunkLen);
                InputStream chunkStream = this.readPayload(this.in, chunkLen);
                this.setInput(chunkStream, chunkLen);
                break;
            }
            case 46: {
                LOG.log(Level.FINEST, "Got stdin closed chunk");
                this.setEof();
                break;
            }
            case 72: {
                LOG.log(Level.FINEST, "Got client heartbeat");
                break;
            }
            default: {
                LOG.log(Level.WARNING, "Unknown chunk type: {0}", Character.valueOf((char)chunkType));
                throw new IOException("Unknown stream type: " + (char)chunkType);
            }
        }
        return chunkType;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setInput(InputStream chunkStream, int chunkLen) throws IOException {
        Object object = this.readLock;
        synchronized (object) {
            if (this.remaining != 0) {
                throw new IOException("Data received before stdin stream was emptied");
            }
            this.stdin = chunkStream;
            this.remaining = chunkLen;
            this.readLock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setEof() {
        Object object = this.readLock;
        synchronized (object) {
            this.eof = true;
            this.readLock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int receive(byte[] b, int offset, int length) throws IOException, InterruptedException {
        Object object = this.readLock;
        synchronized (object) {
            if (this.remaining > 0) {
                int bytesToRead = Math.min(this.remaining, length);
                int result = this.stdin.read(b, offset, bytesToRead);
                this.remaining -= result;
                return result;
            }
            if (this.eof) {
                return -1;
            }
        }
        this.sendSendInput();
        object = this.readLock;
        synchronized (object) {
            if (this.remaining == 0 && !this.eof) {
                this.readLock.wait();
            }
            return this.receive(b, offset, length);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void send(byte streamCode, byte[] b, int offset, int len) throws IOException {
        Object object = this.writeLock;
        synchronized (object) {
            this.out.writeInt(len);
            this.out.writeByte(streamCode);
            this.out.write(b, offset, len);
        }
        this.out.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendSendInput() throws IOException {
        Object object = this.writeLock;
        synchronized (object) {
            this.out.writeInt(0);
            this.out.writeByte(83);
        }
        this.out.flush();
    }

    boolean isClientConnected() {
        return this.clientConnected.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int available() {
        Object object = this.readLock;
        synchronized (object) {
            return this.remaining;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addClientListener(NGClientListener listener) {
        Object object = this.orchestratorEvent;
        synchronized (object) {
            this.clientListeners.add(listener);
            this.orchestratorEvent.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeClientListener(NGClientListener listener) {
        Object object = this.orchestratorEvent;
        synchronized (object) {
            this.clientListeners.remove(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeAllClientListeners() {
        Object object = this.orchestratorEvent;
        synchronized (object) {
            this.clientListeners.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addHeartbeatListener(NGHeartbeatListener listener) {
        Set<NGHeartbeatListener> set = this.heartbeatListeners;
        synchronized (set) {
            this.heartbeatListeners.add(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeHeartbeatListener(NGHeartbeatListener listener) {
        Set<NGHeartbeatListener> set = this.heartbeatListeners;
        synchronized (set) {
            this.heartbeatListeners.remove(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyHeartbeat() {
        ArrayList<NGHeartbeatListener> listeners;
        Set<NGHeartbeatListener> set = this.heartbeatListeners;
        synchronized (set) {
            if (this.heartbeatListeners.isEmpty()) {
                return;
            }
            listeners = new ArrayList<NGHeartbeatListener>(this.heartbeatListeners);
        }
        for (NGHeartbeatListener listener : listeners) {
            listener.heartbeatReceived();
        }
    }
}

