package io.confluent.ksql.rest.server;

import com.clearspring.analytics.util.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.ServiceManager;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.util.DiscoverRemoteHostsUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.net.URL;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent.class */
public final class HeartbeatAgent {
    private static final int SERVICE_TIMEOUT_SEC = 2;
    private static final int CHECK_HEARTBEAT_DELAY_MS = 1000;
    private static final int SEND_HEARTBEAT_DELAY_MS = 100;
    private static final int DISCOVER_CLUSTER_DELAY_MS = 50;
    private static final Logger LOG = LogManager.getLogger(HeartbeatAgent.class);
    private final KsqlEngine engine;
    private final ServiceContext serviceContext;
    private final HeartbeatConfig config;
    private final List<HostStatusListener> hostStatusListeners;
    private final ScheduledExecutorService scheduledExecutorService;
    private KsqlHostInfo localHost;
    private URL localUrl;
    private final ServiceManager serviceManager = new ServiceManager(Arrays.asList(new DiscoverClusterService(), new SendHeartbeatService(), new CheckHeartbeatService()));
    private final ConcurrentHashMap<KsqlHostInfo, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<KsqlHostInfo, HostStatus> hostsStatus = new ConcurrentHashMap<>();
    private final Clock clock = Clock.systemUTC();

    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$Builder.class */
    public static class Builder {
        private int nestedThreadPoolSize;
        private long nestedHeartbeatSendIntervalMs;
        private long nestedHeartbeatCheckIntervalMs;
        private long nestedDiscoverClusterIntervalMs;
        private long nestedHeartbeatWindowMs;
        private long nestedHeartbeatMissedThreshold;
        private List<HostStatusListener> nestedHostStatusListeners = Lists.newArrayList();

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder threadPoolSize(int i) {
            this.nestedThreadPoolSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder heartbeatSendInterval(long j) {
            this.nestedHeartbeatSendIntervalMs = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder heartbeatCheckInterval(long j) {
            this.nestedHeartbeatCheckIntervalMs = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder heartbeatWindow(long j) {
            this.nestedHeartbeatWindowMs = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder heartbeatMissedThreshold(long j) {
            this.nestedHeartbeatMissedThreshold = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder discoverClusterInterval(long j) {
            this.nestedDiscoverClusterIntervalMs = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder addHostStatusListener(HostStatusListener hostStatusListener) {
            this.nestedHostStatusListeners.add(hostStatusListener);
            return this;
        }

        public HeartbeatAgent build(KsqlEngine ksqlEngine, ServiceContext serviceContext) {
            return new HeartbeatAgent(ksqlEngine, serviceContext, new HeartbeatConfig(this.nestedThreadPoolSize, this.nestedHeartbeatSendIntervalMs, this.nestedHeartbeatCheckIntervalMs, this.nestedHeartbeatWindowMs, this.nestedHeartbeatMissedThreshold, this.nestedDiscoverClusterIntervalMs), this.nestedHostStatusListeners);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$CheckHeartbeatService.class */
    class CheckHeartbeatService extends AbstractScheduledService {
        CheckHeartbeatService() {
        }

        protected void runOneIteration() {
            long millis = HeartbeatAgent.this.clock.millis();
            runWithWindow(millis - HeartbeatAgent.this.config.heartbeatWindowMs, millis);
        }

        @VisibleForTesting
        void runWithWindow(long j, long j2) {
            try {
                processHeartbeats(j, j2);
            } catch (Throwable th) {
                Logger logger = HeartbeatAgent.LOG;
                th.getMessage();
                logger.error("Failed to process heartbeats for window start = " + j + " end = " + logger + " with exception " + j2, th);
            }
        }

        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedRateSchedule(1000L, HeartbeatAgent.this.config.heartbeatCheckIntervalMs, TimeUnit.MILLISECONDS);
        }

        protected ScheduledExecutorService executor() {
            return HeartbeatAgent.this.scheduledExecutorService;
        }

        private void processHeartbeats(long j, long j2) {
            TreeMap<Long, HeartbeatInfo> treeMap;
            if (HeartbeatAgent.this.receivedHeartbeats.isEmpty()) {
                HeartbeatAgent.this.hostsStatus.replaceAll((ksqlHostInfo, hostStatus) -> {
                    return !ksqlHostInfo.equals(HeartbeatAgent.this.localHost) ? hostStatus.withHostAlive(false) : hostStatus;
                });
                notifyListeners();
                return;
            }
            for (Map.Entry<KsqlHostInfo, HostStatus> entry : HeartbeatAgent.this.hostsStatus.entrySet()) {
                KsqlHostInfo key = entry.getKey();
                entry.getValue();
                if (!key.equals(HeartbeatAgent.this.localHost)) {
                    TreeMap<Long, HeartbeatInfo> treeMap2 = HeartbeatAgent.this.receivedHeartbeats.get(key);
                    if (treeMap2 == null || treeMap2.isEmpty()) {
                        HeartbeatAgent.this.hostsStatus.computeIfPresent(key, (ksqlHostInfo2, hostStatus2) -> {
                            return hostStatus2.withHostAlive(false);
                        });
                    } else {
                        synchronized (treeMap2) {
                            HeartbeatAgent.LOG.debug("Process heartbeats: {} of host: {}", treeMap2, key);
                            treeMap2.headMap(Long.valueOf(j)).clear();
                            treeMap = new TreeMap<>((SortedMap<Long, ? extends HeartbeatInfo>) treeMap2.subMap(Long.valueOf(j), true, Long.valueOf(j2), true));
                            HeartbeatAgent.LOG.debug("Process heartbeats: {} of host: {}, window start: {}, window end: {}", treeMap, key, Long.valueOf(j), Long.valueOf(j2));
                        }
                        boolean decideStatus = decideStatus(key, j, j2, treeMap);
                        if (!decideStatus) {
                            HeartbeatAgent.LOG.info("Host: {} marked as dead.", key);
                        }
                        HeartbeatAgent.this.hostsStatus.computeIfPresent(key, (ksqlHostInfo3, hostStatus3) -> {
                            return hostStatus3.withHostAlive(decideStatus).withLastStatusUpdateMs(j2);
                        });
                    }
                }
            }
            notifyListeners();
        }

        private void notifyListeners() {
            Iterator<HostStatusListener> it = HeartbeatAgent.this.hostStatusListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().onHostStatusUpdated(HeartbeatAgent.this.getHostsStatus());
                } catch (Throwable th) {
                    HeartbeatAgent.LOG.error("Error while notifying listener", th);
                }
            }
        }

        private boolean decideStatus(KsqlHostInfo ksqlHostInfo, long j, long j2, TreeMap<Long, HeartbeatInfo> treeMap) {
            long j3 = 0;
            long j4 = j;
            if (treeMap.isEmpty()) {
                return false;
            }
            Iterator<Long> it = treeMap.keySet().iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                if (longValue >= j2) {
                    break;
                }
                if (longValue - HeartbeatAgent.this.config.heartbeatSendIntervalMs > j4) {
                    j3 = ((longValue - j4) - 1) / HeartbeatAgent.this.config.heartbeatSendIntervalMs;
                    HeartbeatAgent.LOG.debug("Host: {} missed: {} heartbeats, current heartbeat: {}, previous heartbeat: {}, send interval: {}.", ksqlHostInfo, Long.valueOf(j3), Long.valueOf(longValue), Long.valueOf(j4), Long.valueOf(HeartbeatAgent.this.config.heartbeatSendIntervalMs));
                } else {
                    j3 = 0;
                }
                j4 = longValue;
            }
            if ((j2 - j4) - 1 > 0) {
                j3 = ((j2 - j4) - 1) / HeartbeatAgent.this.config.heartbeatSendIntervalMs;
                HeartbeatAgent.LOG.debug("Host: {} missed: {} heartbeats, window end: {}, previous heartbeat: {}, send interval: {}.", ksqlHostInfo, Long.valueOf(j3), Long.valueOf(j2), Long.valueOf(j4), Long.valueOf(HeartbeatAgent.this.config.heartbeatSendIntervalMs));
            }
            HeartbeatAgent.LOG.debug("Host: {} has {} missing heartbeats", ksqlHostInfo, Long.valueOf(j3));
            return j3 < HeartbeatAgent.this.config.heartbeatMissedThreshold;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$DiscoverClusterService.class */
    class DiscoverClusterService extends AbstractScheduledService {
        DiscoverClusterService() {
        }

        protected void runOneIteration() {
            try {
                List persistentQueries = HeartbeatAgent.this.engine.getPersistentQueries();
                if (persistentQueries.isEmpty()) {
                    return;
                }
                for (HostInfo hostInfo : DiscoverRemoteHostsUtil.getRemoteHosts(persistentQueries, HeartbeatAgent.this.localHost)) {
                    HeartbeatAgent.this.hostsStatus.computeIfAbsent(new KsqlHostInfo(hostInfo.host(), hostInfo.port()), ksqlHostInfo -> {
                        return new HostStatus(true, HeartbeatAgent.this.clock.millis());
                    });
                }
            } catch (Throwable th) {
                HeartbeatAgent.LOG.error("Failed to discover cluster with exception " + th.getMessage(), th);
            }
        }

        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedRateSchedule(50L, HeartbeatAgent.this.config.discoverClusterIntervalMs, TimeUnit.MILLISECONDS);
        }

        protected ScheduledExecutorService executor() {
            return HeartbeatAgent.this.scheduledExecutorService;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$HeartbeatConfig.class */
    public static class HeartbeatConfig {
        private final int threadPoolSize;
        private final long heartbeatSendIntervalMs;
        private final long heartbeatCheckIntervalMs;
        private final long heartbeatWindowMs;
        private final long heartbeatMissedThreshold;
        private final long discoverClusterIntervalMs;

        HeartbeatConfig(int i, long j, long j2, long j3, long j4, long j5) {
            this.threadPoolSize = i;
            this.heartbeatSendIntervalMs = j;
            this.heartbeatCheckIntervalMs = j2;
            this.heartbeatWindowMs = j3;
            this.heartbeatMissedThreshold = j4;
            this.discoverClusterIntervalMs = j5;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$HeartbeatInfo.class */
    public static class HeartbeatInfo {
        private final long timestamp;

        public HeartbeatInfo(long j) {
            this.timestamp = j;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return String.valueOf(this.timestamp);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$HostStatusListener.class */
    public interface HostStatusListener {
        void onHostStatusUpdated(Map<KsqlHostInfo, HostStatus> map);
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/HeartbeatAgent$SendHeartbeatService.class */
    class SendHeartbeatService extends AbstractScheduledService {
        SendHeartbeatService() {
        }

        protected void runOneIteration() {
            Iterator<Map.Entry<KsqlHostInfo, HostStatus>> it = HeartbeatAgent.this.hostsStatus.entrySet().iterator();
            while (it.hasNext()) {
                KsqlHostInfo key = it.next().getKey();
                try {
                    if (!key.equals(HeartbeatAgent.this.localHost)) {
                        URI buildRemoteUri = ServerUtil.buildRemoteUri(HeartbeatAgent.this.localUrl, key.host(), key.port());
                        HeartbeatAgent.LOG.debug("Send heartbeat to host {} at {}", key, Long.valueOf(HeartbeatAgent.this.clock.millis()));
                        HeartbeatAgent.this.serviceContext.getKsqlClient().makeAsyncHeartbeatRequest(buildRemoteUri, HeartbeatAgent.this.localHost, HeartbeatAgent.this.clock.millis());
                    }
                } catch (Throwable th) {
                    HeartbeatAgent.LOG.error("Request to server: " + String.valueOf(key) + " failed with exception: " + th.getMessage(), th);
                }
            }
        }

        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedRateSchedule(100L, HeartbeatAgent.this.config.heartbeatSendIntervalMs, TimeUnit.MILLISECONDS);
        }

        protected ScheduledExecutorService executor() {
            return HeartbeatAgent.this.scheduledExecutorService;
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    private HeartbeatAgent(KsqlEngine ksqlEngine, ServiceContext serviceContext, HeartbeatConfig heartbeatConfig, List<HostStatusListener> list) {
        this.engine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "engine");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.config = (HeartbeatConfig) Objects.requireNonNull(heartbeatConfig, "configuration parameters");
        this.hostStatusListeners = (List) Objects.requireNonNull(list, "heartbeatListeners");
        this.scheduledExecutorService = Executors.newScheduledThreadPool(heartbeatConfig.threadPoolSize);
    }

    public void receiveHeartbeat(KsqlHostInfo ksqlHostInfo, long j) {
        TreeMap<Long, HeartbeatInfo> computeIfAbsent = this.receivedHeartbeats.computeIfAbsent(ksqlHostInfo, ksqlHostInfo2 -> {
            return new TreeMap();
        });
        synchronized (computeIfAbsent) {
            LOG.debug("Receive heartbeat at: {} from host: {} ", Long.valueOf(j), ksqlHostInfo);
            computeIfAbsent.put(Long.valueOf(j), new HeartbeatInfo(j));
        }
    }

    public Map<KsqlHostInfo, HostStatus> getHostsStatus() {
        return Collections.unmodifiableMap(this.hostsStatus);
    }

    @VisibleForTesting
    void setHostsStatus(Map<KsqlHostInfo, HostStatus> map) {
        this.hostsStatus.putAll(map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startAgent() {
        try {
            this.serviceManager.startAsync().awaitHealthy(2L, TimeUnit.SECONDS);
        } catch (IllegalStateException | TimeoutException e) {
            LOG.error("Failed to start heartbeat services with exception " + e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopAgent() {
        try {
            this.serviceManager.stopAsync().awaitStopped(2L, TimeUnit.SECONDS);
        } catch (IllegalStateException | TimeoutException e) {
            LOG.error("Failed to stop heartbeat services with exception " + e.getMessage(), e);
        } finally {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setLocalAddress(String str) {
        HostInfo parseHostInfo = ServerUtil.parseHostInfo(str);
        this.localHost = new KsqlHostInfo(parseHostInfo.host(), parseHostInfo.port());
        try {
            this.localUrl = new URL(str);
            Preconditions.checkState(this.hostsStatus.isEmpty(), "expected empty host status map on startup");
            this.hostsStatus.putIfAbsent(this.localHost, new HostStatus(true, this.clock.millis()));
        } catch (Exception e) {
            throw new IllegalStateException("Failed to convert remote host info to URL. remoteInfo: " + this.localHost.host() + ":" + this.localHost.host());
        }
    }
}
