/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.applicationinsights.internal.channel.common;

import com.microsoft.applicationinsights.core.dependencies.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
import com.microsoft.applicationinsights.internal.channel.TransmissionsLoader;
import com.microsoft.applicationinsights.internal.channel.common.Transmission;
import com.microsoft.applicationinsights.internal.channel.common.TransmissionFileSystemOutput;
import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicy;
import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyStateFetcher;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public final class ActiveTransmissionLoader
implements TransmissionsLoader {
    public static final int MAX_THREADS_ALLOWED = 10;
    private static final int DEFAULT_NUMBER_OF_THREADS = 1;
    private static final long DEFAULT_SLEEP_INTERVAL_WHEN_NO_TRANSMISSIONS_FOUND_IN_MILLS = 2000L;
    private static final long DEFAULT_SLEEP_INTERVAL_AFTER_DISPATCHING_IN_MILLS = 100L;
    private final TransmissionFileSystemOutput fileSystem;
    private final AtomicBoolean done = new AtomicBoolean(false);
    private final TransmissionDispatcher dispatcher;
    private final CountDownLatch latch;
    private final TransmissionPolicyStateFetcher transmissionPolicyFetcher;
    private final Thread[] threads;
    private final long sleepIntervalWhenNoTransmissionsFoundInMills;

    public ActiveTransmissionLoader(TransmissionFileSystemOutput fileSystem, TransmissionPolicyStateFetcher transmissionPolicy, TransmissionDispatcher dispatcher) {
        this(fileSystem, dispatcher, transmissionPolicy, 1);
    }

    public ActiveTransmissionLoader(TransmissionFileSystemOutput fileSystem, TransmissionDispatcher dispatcher, TransmissionPolicyStateFetcher transmissionPolicy, int numberOfThreads) {
        Preconditions.checkNotNull(fileSystem, "fileSystem must be a non-null value");
        Preconditions.checkNotNull(dispatcher, "dispatcher must be a non-null value");
        Preconditions.checkNotNull(transmissionPolicy, "transmissionPolicy must be a non-null value");
        Preconditions.checkArgument(numberOfThreads > 0, "numberOfThreads must be a positive number");
        Preconditions.checkArgument(numberOfThreads < 10, "numberOfThreads must be smaller than %s", 10);
        this.sleepIntervalWhenNoTransmissionsFoundInMills = 2000L;
        this.transmissionPolicyFetcher = transmissionPolicy;
        this.fileSystem = fileSystem;
        this.dispatcher = dispatcher;
        this.threads = new Thread[numberOfThreads];
        this.latch = new CountDownLatch(numberOfThreads);
        String threadNameFmt = String.format("%s-worker-%%d", ActiveTransmissionLoader.class.getSimpleName());
        for (int i = 0; i < numberOfThreads; ++i) {
            this.threads[i] = new Thread(new Runnable(){

                @Override
                public void run() {
                    ActiveTransmissionLoader.this.latch.countDown();
                    block9: while (!ActiveTransmissionLoader.this.done.get()) {
                        try {
                            TransmissionPolicy currentTransmissionState = ActiveTransmissionLoader.this.transmissionPolicyFetcher.getCurrentState();
                            switch (currentTransmissionState) {
                                case UNBLOCKED: {
                                    ActiveTransmissionLoader.this.fetchNext(true);
                                    continue block9;
                                }
                                case BACKOFF: 
                                case BLOCKED_BUT_CAN_BE_PERSISTED: {
                                    Thread.sleep(100L);
                                    continue block9;
                                }
                                case BLOCKED_AND_CANNOT_BE_PERSISTED: {
                                    ActiveTransmissionLoader.this.fetchNext(false);
                                    continue block9;
                                }
                            }
                            InternalLogger.INSTANCE.error("Could not find transmission policy '%s'", new Object[]{currentTransmissionState});
                            Thread.sleep(100L);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                        catch (ThreadDeath td) {
                            throw td;
                        }
                        catch (Throwable throwable) {
                        }
                    }
                }
            }, String.format(threadNameFmt, i));
            this.threads[i].setDaemon(true);
        }
    }

    @Override
    public synchronized boolean load(boolean waitForThreadsToStart) {
        for (Thread thread : this.threads) {
            thread.start();
        }
        if (!waitForThreadsToStart) {
            return true;
        }
        try {
            this.latch.await();
            return true;
        }
        catch (InterruptedException e) {
            InternalLogger.INSTANCE.error("Interrupted waiting for threads to start: %s", e.toString());
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public void stop(long timeout, TimeUnit timeUnit) {
        this.done.set(true);
        this.interruptAllThreads();
        this.joinAllThreads();
    }

    private void joinAllThreads() {
        for (Thread thread : this.threads) {
            try {
                thread.join();
            }
            catch (InterruptedException e) {
                InternalLogger.INSTANCE.error("Interrupted during join of active transmission loader, exception: %s", e.toString());
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    private void interruptAllThreads() {
        for (Thread thread : this.threads) {
            thread.interrupt();
        }
    }

    private void fetchNext(boolean shouldDispatch) throws InterruptedException {
        Transmission transmission = this.fileSystem.fetchOldestFile();
        if (transmission == null) {
            Thread.sleep(this.sleepIntervalWhenNoTransmissionsFoundInMills);
        } else {
            if (shouldDispatch) {
                this.dispatcher.dispatch(transmission);
            }
            Thread.sleep(100L);
        }
    }
}

