/*
 * Decompiled with CFR 0.152.
 */
package com.marcnuri.yakc.reactivex;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.marcnuri.yakc.KubernetesClient;
import com.marcnuri.yakc.api.KubernetesException;
import com.marcnuri.yakc.api.WatchEvent;
import com.marcnuri.yakc.api.WatchException;
import com.marcnuri.yakc.model.ListModel;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import retrofit2.Converter;

public class WatchOnSubscribe<T>
implements ObservableOnSubscribe<WatchEvent<T>>,
Disposable {
    private final OkHttpClient noTimeoutClient;
    private final Request request;
    private final Converter<ResponseBody, WatchEvent<T>> converter;
    private final ExecutorService executorService;
    private final AtomicBoolean disposed = new AtomicBoolean(false);

    public WatchOnSubscribe(Type responseType, Request request, KubernetesClient kubernetesClient) throws KubernetesException {
        this.converter = kubernetesClient.getRetrofit().responseBodyConverter((Type)WatchOnSubscribe.parametrizedWatchEventType(WatchOnSubscribe.resolveListModelParameterType(responseType)), new Annotation[0]);
        this.request = request;
        this.noTimeoutClient = kubernetesClient.getOkHttpClient().newBuilder().readTimeout(Duration.ZERO).build();
        this.executorService = Executors.newSingleThreadExecutor();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(ObservableEmitter<WatchEvent<T>> emitter) {
        Callable<Void> responseReader = () -> {
            HttpUrl updatedUrl = this.request.url().newBuilder().addQueryParameter("watch", "true").build();
            Request updatedRequest = this.request.newBuilder().url(updatedUrl).build();
            try (Response response = this.noTimeoutClient.newCall(updatedRequest).execute();
                 InputStream is = Optional.ofNullable(response.body()).map(ResponseBody::source).orElseThrow(() -> new WatchException("Response contains no body", response)).inputStream();
                 InputStreamReader isr = new InputStreamReader(is);
                 BufferedReader br = new BufferedReader(isr);){
                String line;
                if (!response.isSuccessful()) {
                    emitter.tryOnError((Throwable)KubernetesException.forResponse((String)"Error opening Watch connection", (Response)response));
                }
                while (response.isSuccessful() && !emitter.isDisposed() && (line = br.readLine()) != null) {
                    WatchEvent next = (WatchEvent)this.converter.convert((Object)ResponseBody.create((MediaType)MediaType.get((String)"application/json"), (String)line));
                    emitter.onNext((Object)next);
                }
            }
            catch (IOException ex) {
                emitter.tryOnError((Throwable)ex);
            }
            emitter.onComplete();
            this.executorService.shutdownNow();
            return null;
        };
        emitter.setDisposable((Disposable)this);
        try {
            this.executorService.invokeAll(Collections.singletonList(responseReader));
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.executorService.shutdownNow();
        }
    }

    public void dispose() {
        this.executorService.shutdownNow();
        this.disposed.set(true);
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }

    private static JavaType parametrizedWatchEventType(JavaType listParameterModelType) {
        return TypeFactory.defaultInstance().constructParametricType(WatchEvent.class, new JavaType[]{listParameterModelType});
    }

    private static JavaType resolveListModelParameterType(Type responseType) throws KubernetesException {
        return TypeFactory.defaultInstance().constructType(responseType).getInterfaces().stream().filter(t -> t.getRawClass() == ListModel.class).findFirst().map(listModelInterface -> listModelInterface.containedType(0)).orElseThrow(() -> new WatchException("Watch is intended to be run for endpoints returning a ListModel instance", null));
    }
}

