/*
 * Decompiled with CFR 0.152.
 */
package com.marcnuri.yakc.reactivex;

import com.marcnuri.yakc.KubernetesClient;
import com.marcnuri.yakc.api.ExecMessage;
import com.marcnuri.yakc.api.KubernetesException;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.HttpUrl;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class ExecOnSubscribe
implements ObservableOnSubscribe<ExecMessage>,
Disposable {
    private final Request request;
    private final KubernetesClient kubernetesClient;
    private final AtomicBoolean disposed = new AtomicBoolean(false);
    private AtomicReference<WebSocket> webSocket = new AtomicReference<Object>(null);

    public ExecOnSubscribe(Request request, KubernetesClient kubernetesClient) {
        this.request = request;
        this.kubernetesClient = kubernetesClient;
    }

    public void subscribe(final ObservableEmitter<ExecMessage> emitter) throws Exception {
        emitter.setDisposable((Disposable)this);
        final CountDownLatch cdl = new CountDownLatch(1);
        HttpUrl updatedUrl = this.request.url().newBuilder().addQueryParameter("stdout", "true").addQueryParameter("stderr", "true").build();
        Request updatedRequest = this.request.newBuilder().url(updatedUrl).build();
        this.webSocket.set(this.kubernetesClient.getOkHttpClient().newWebSocket(updatedRequest, new WebSocketListener(){

            public void onMessage(WebSocket webSocket, String text) {
                emitter.onNext((Object)ExecMessage.builder().standardStream(ExecMessage.StandardStream.STDOUT).message(text).build());
            }

            public void onMessage(WebSocket webSocket, ByteString bytes) {
                emitter.onNext((Object)ExecMessage.builder().standardStream(ExecMessage.StandardStream.fromByte((int)bytes.getByte(0))).message(bytes.substring(1).utf8()).build());
            }

            public void onClosing(WebSocket webSocket, int code, String reason) {
                ExecOnSubscribe.this.dispose();
            }

            public void onClosed(WebSocket webSocket, int code, String reason) {
                emitter.onComplete();
                this.close();
            }

            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                String message = Optional.ofNullable(response).map(Response::body).map(rb -> {
                    try {
                        return rb.string();
                    }
                    catch (IOException ex) {
                        return null;
                    }
                }).orElse(t.getMessage());
                emitter.onError((Throwable)new KubernetesException(message, response));
                this.close();
            }

            private void close() {
                ExecOnSubscribe.this.disposed.set(true);
                cdl.countDown();
            }
        }));
        cdl.await();
    }

    public void dispose() {
        Optional.ofNullable(this.webSocket.get()).ifPresent(ws -> ws.close(1000, null));
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }
}

