package org.deeplearning4j.spark.parameterserver.pw;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.bytedeco.javacpp.Loader;
import org.deeplearning4j.api.storage.StatsStorageRouter;
import org.deeplearning4j.api.storage.listener.RoutingIterationListener;
import org.deeplearning4j.exception.DL4JInvalidConfigException;
import org.deeplearning4j.nn.api.Model;
import org.deeplearning4j.nn.api.Updater;
import org.deeplearning4j.nn.graph.ComputationGraph;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
import org.deeplearning4j.nn.updater.BaseMultiLayerUpdater;
import org.deeplearning4j.optimize.api.TrainingListener;
import org.deeplearning4j.optimize.listeners.SleepyTrainingListener;
import org.deeplearning4j.optimize.solvers.accumulation.EncodedGradientsAccumulator;
import org.deeplearning4j.parallelism.ParallelWrapper;
import org.deeplearning4j.spark.parameterserver.conf.SharedTrainingConfiguration;
import org.deeplearning4j.spark.parameterserver.iterators.VirtualDataSetIterator;
import org.deeplearning4j.spark.parameterserver.iterators.VirtualIterator;
import org.deeplearning4j.spark.parameterserver.iterators.VirtualMultiDataSetIterator;
import org.deeplearning4j.spark.parameterserver.networking.v2.ModelParamsConsumer;
import org.deeplearning4j.spark.parameterserver.networking.v2.UpdaterParamsConsumer;
import org.deeplearning4j.spark.parameterserver.networking.v2.UpdatesConsumer;
import org.deeplearning4j.spark.parameterserver.networking.v2.WiredEncodingHandler;
import org.deeplearning4j.spark.parameterserver.training.SharedTrainingResult;
import org.deeplearning4j.spark.parameterserver.training.SharedTrainingWorker;
import org.deeplearning4j.spark.parameterserver.util.BlockingObserver;
import org.deeplearning4j.spark.parameterserver.util.CountingIterator;
import org.deeplearning4j.spark.util.SparkUtils;
import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.dataset.DataSet;
import org.nd4j.linalg.dataset.api.MultiDataSet;
import org.nd4j.linalg.factory.Nd4j;
import org.nd4j.parameterserver.distributed.conf.VoidConfiguration;
import org.nd4j.parameterserver.distributed.enums.TransportType;
import org.nd4j.parameterserver.distributed.util.NetworkOrganizer;
import org.nd4j.parameterserver.distributed.v2.ModelParameterServer;
import org.nd4j.parameterserver.distributed.v2.transport.UpdaterParametersProvider;
import org.nd4j.parameterserver.distributed.v2.transport.impl.AeronUdpTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/deeplearning4j/spark/parameterserver/pw/SharedTrainingWrapper.class */
public class SharedTrainingWrapper {
    private static final Logger log = LoggerFactory.getLogger(SharedTrainingWrapper.class);
    private static SharedTrainingWrapper INSTANCE = new SharedTrainingWrapper();
    private static AtomicLong LAST_INSTANCE_ID = new AtomicLong(Long.MIN_VALUE);
    protected ParallelWrapper wrapper;
    protected VirtualDataSetIterator iteratorDS;
    protected VirtualMultiDataSetIterator iteratorMDS;
    protected List<Iterator<DataSet>> iteratorsDS;
    protected List<Iterator<MultiDataSet>> iteratorsMDS;
    protected Throwable exception;
    protected EncodedGradientsAccumulator accumulator;
    protected Model originalModel;
    protected UpdatesConsumer consumer;
    protected AtomicBoolean isFirst = new AtomicBoolean(false);
    protected AtomicBoolean exceptionEncountered = new AtomicBoolean(false);
    protected ThreadLocal<AtomicInteger> iteratorDataSetCount = new ThreadLocal<>();
    protected ThreadLocal<BlockingObserver> observer = new ThreadLocal<>();

    protected SharedTrainingWrapper() {
        init();
    }

    protected void init() {
        this.iteratorsDS = new CopyOnWriteArrayList();
        this.iteratorsMDS = new CopyOnWriteArrayList();
        this.iteratorDS = new VirtualDataSetIterator(this.iteratorsDS);
    }

    public static synchronized SharedTrainingWrapper getInstance(long j) {
        if (LAST_INSTANCE_ID.get() != Long.MIN_VALUE && LAST_INSTANCE_ID.get() != j) {
            log.debug("Shutting down existing SharedTrainingWrapper instances; resetting state - previous instance ID {}, new instance ID {}", Long.valueOf(LAST_INSTANCE_ID.get()), Long.valueOf(j));
            if (INSTANCE.wrapper != null) {
                INSTANCE.wrapper.shutdown();
                INSTANCE.wrapper = null;
            }
            INSTANCE.iteratorsDS.clear();
            INSTANCE.iteratorsMDS.clear();
            INSTANCE.exceptionEncountered.set(false);
            INSTANCE.iteratorDataSetCount = new ThreadLocal<>();
            INSTANCE.accumulator = null;
            INSTANCE.originalModel = null;
            INSTANCE.consumer = null;
            LAST_INSTANCE_ID.set(j);
        }
        if (LAST_INSTANCE_ID.get() == Long.MIN_VALUE) {
            LAST_INSTANCE_ID.set(j);
        }
        return INSTANCE;
    }

    public void attachDS(Iterator<DataSet> it) {
        log.debug("Attaching thread...");
        if (this.iteratorDataSetCount.get() == null) {
            this.iteratorDataSetCount.set(new AtomicInteger(0));
        }
        AtomicInteger atomicInteger = this.iteratorDataSetCount.get();
        atomicInteger.set(0);
        VirtualIterator virtualIterator = new VirtualIterator(new CountingIterator(it, atomicInteger));
        BlockingObserver blockingObserver = new BlockingObserver(this.exceptionEncountered);
        virtualIterator.addObserver(blockingObserver);
        this.iteratorsDS.add(virtualIterator);
        this.observer.set(blockingObserver);
    }

    public void attachMDS(Iterator<MultiDataSet> it) {
        log.debug("Attaching thread...");
        if (this.iteratorDataSetCount.get() == null) {
            this.iteratorDataSetCount.set(new AtomicInteger(0));
        }
        AtomicInteger atomicInteger = this.iteratorDataSetCount.get();
        atomicInteger.set(0);
        VirtualIterator virtualIterator = new VirtualIterator(new CountingIterator(it, atomicInteger));
        BlockingObserver blockingObserver = new BlockingObserver(this.exceptionEncountered);
        virtualIterator.addObserver(blockingObserver);
        this.iteratorsMDS.add(virtualIterator);
        this.observer.set(blockingObserver);
    }

    public SharedTrainingResult run(SharedTrainingWorker sharedTrainingWorker) {
        if (!this.isFirst.compareAndSet(false, true)) {
            try {
                this.observer.get().waitTillDone();
                log.info("Feeder [{}] thread done...", Thread.currentThread().getName());
                if (this.exceptionEncountered.get()) {
                    throw new RuntimeException("Training failed due to exception in ParallelWrapper fit operation", (this.wrapper == null || this.exception != null) ? this.exception : this.wrapper.getException());
                }
                return SharedTrainingResult.builder().minibatchesPerExecutor(Collections.singletonMap(SparkUtils.getSparkExecutorId(), Integer.valueOf(this.iteratorDataSetCount.get().get()))).build();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        this.exceptionEncountered.set(false);
        this.exception = null;
        SharedTrainingConfiguration sharedTrainingConfiguration = (SharedTrainingConfiguration) sharedTrainingWorker.getBroadcastConfiguration().getValue();
        VoidConfiguration voidConfiguration = ((SharedTrainingConfiguration) sharedTrainingWorker.getBroadcastConfiguration().getValue()).getVoidConfiguration();
        ComputationGraph computationGraph = null;
        int numberOfDevices = Nd4j.getAffinityManager().getNumberOfDevices();
        int numberOfWorkersPerNode = sharedTrainingConfiguration.getNumberOfWorkersPerNode() > 0 ? sharedTrainingConfiguration.getNumberOfWorkersPerNode() : numberOfDevices > 1 ? numberOfDevices : Math.min(6, Math.max(1, Loader.totalCores() / 4));
        if (numberOfDevices > 1 && numberOfWorkersPerNode > numberOfDevices) {
            log.warn("WARNING! Using more workers then number of available computational devices!");
        }
        if (this.wrapper == null) {
            log.debug("Starting ParallelWrapper at thread {}", Long.valueOf(Thread.currentThread().getId()));
            computationGraph = sharedTrainingWorker.getInitialModel();
            if (computationGraph == null) {
                computationGraph = sharedTrainingWorker.getInitialModelGraph();
            }
            if (computationGraph == null) {
                throw new DL4JInvalidConfigException("No model was defined for training");
            }
            List<TrainingListener> listeners = sharedTrainingWorker.getListeners();
            if (listeners != null) {
                computationGraph.setListeners(listeners);
                StatsStorageRouter router = sharedTrainingWorker.getRouter();
                if (router != null) {
                    Iterator<TrainingListener> it = listeners.iterator();
                    while (it.hasNext()) {
                        RoutingIterationListener routingIterationListener = (TrainingListener) it.next();
                        if (routingIterationListener instanceof RoutingIterationListener) {
                            routingIterationListener.setStorageRouter(router);
                        }
                    }
                }
            }
            WiredEncodingHandler wiredEncodingHandler = new WiredEncodingHandler(sharedTrainingConfiguration.getThresholdAlgorithm(), sharedTrainingConfiguration.getResidualPostProcessor(), null, sharedTrainingConfiguration.isEncodingDebugMode());
            ModelParamsConsumer modelParamsConsumer = new ModelParamsConsumer();
            UpdaterParamsConsumer updaterParamsConsumer = new UpdaterParamsConsumer();
            if (this.accumulator == null) {
                this.accumulator = new EncodedGradientsAccumulator.Builder(numberOfWorkersPerNode).messageHandler(wiredEncodingHandler).thresholdAlgorithm(sharedTrainingConfiguration.getThresholdAlgorithm()).residualPostProcessor(sharedTrainingConfiguration.getResidualPostProcessor()).memoryParameters(sharedTrainingConfiguration.getBufferSize() > 0 ? sharedTrainingConfiguration.getBufferSize() : EncodedGradientsAccumulator.getOptimalBufferSize(computationGraph, numberOfWorkersPerNode, 2), numberOfWorkersPerNode * 2).encodingDebugMode(sharedTrainingConfiguration.isEncodingDebugMode()).build();
                String str = null;
                if (0 == 0 && voidConfiguration.getNetworkMask() != null) {
                    str = new NetworkOrganizer(voidConfiguration.getNetworkMask()).getMatchingAddress();
                }
                if (str == null) {
                    str = System.getenv("DL4J_VOID_IP");
                }
                if (str == null) {
                    str = "127.0.0.1";
                    log.warn("Can't get IP address to start VoidParameterServer client. Using localhost instead");
                }
                log.debug("Checking for ModelParameterServer existence");
                this.originalModel = computationGraph;
                if (!ModelParameterServer.getInstance().isInitialized()) {
                    log.info("Initializing transport [{}:{}] with root as [{}:{}]...", new Object[]{str, Integer.valueOf(voidConfiguration.getPortSupplier().getPort()), voidConfiguration.getControllerAddress(), Integer.valueOf(voidConfiguration.getUnicastControllerPort())});
                    AeronUdpTransport aeronUdpTransport = voidConfiguration.getTransportType() == TransportType.ROUTED_UDP ? new AeronUdpTransport(str, voidConfiguration.getPortSupplier().getPort(), voidConfiguration.getControllerAddress(), voidConfiguration.getUnicastControllerPort(), voidConfiguration) : null;
                    if (aeronUdpTransport == null) {
                        throw new DL4JInvalidConfigException("No Transport implementation was defined for this training session!");
                    }
                    this.consumer = UpdatesConsumer.builder().numWorkers(numberOfWorkersPerNode).accumulator(this.accumulator).params(computationGraph.params()).build();
                    this.accumulator.setExternalSource(this.consumer.getUpdatesQueue());
                    log.debug("Configuring transport...");
                    ModelParameterServer.getInstance().configure(voidConfiguration, aeronUdpTransport, new UpdaterParametersProvider() { // from class: org.deeplearning4j.spark.parameterserver.pw.SharedTrainingWrapper.1
                        public INDArray getUpdaterParameters() {
                            SharedTrainingWrapper.log.info("Serving updater parameters...");
                            Updater updater = null;
                            if (SharedTrainingWrapper.this.originalModel instanceof MultiLayerNetwork) {
                                updater = SharedTrainingWrapper.this.originalModel.getUpdater();
                            } else if (SharedTrainingWrapper.this.originalModel instanceof ComputationGraph) {
                                updater = SharedTrainingWrapper.this.originalModel.getUpdater();
                            }
                            if (updater == null) {
                                SharedTrainingWrapper.log.warn("No Updater in the model");
                                return null;
                            }
                            if (updater instanceof BaseMultiLayerUpdater) {
                                return ((BaseMultiLayerUpdater) updater).getStateViewArrayCopy();
                            }
                            SharedTrainingWrapper.log.error("Updater doesn't implement getStateViewArrayCopy()");
                            return null;
                        }
                    });
                    ModelParameterServer.getInstance().addUpdatesSubscriber(this.consumer);
                    ModelParameterServer.getInstance().addModelParamsSubscriber(modelParamsConsumer);
                    ModelParameterServer.getInstance().addUpdaterParamsSubscriber(updaterParamsConsumer);
                }
                log.debug("Starting ModelParameterServer...");
                ModelParameterServer.getInstance().launch();
                while (!ModelParameterServer.getInstance().getTransport().isIntroduced()) {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                        throw new RuntimeException(e2);
                    }
                }
            }
            if (this.originalModel instanceof MultiLayerNetwork) {
                ((MultiLayerNetwork) computationGraph).setIterationCount(((Integer) ModelParameterServer.getInstance().getStartPosition().getFirst()).intValue());
                ((MultiLayerNetwork) computationGraph).setEpochCount(((Integer) ModelParameterServer.getInstance().getStartPosition().getSecond()).intValue());
            } else if (this.originalModel instanceof ComputationGraph) {
                computationGraph.getConfiguration().setIterationCount(((Integer) ModelParameterServer.getInstance().getStartPosition().getFirst()).intValue());
                computationGraph.getConfiguration().setEpochCount(((Integer) ModelParameterServer.getInstance().getStartPosition().getSecond()).intValue());
            }
            if (sharedTrainingConfiguration.getDebugLongerIterations() > 0) {
                log.warn("Adding SleepyListener: {} ms", Long.valueOf(sharedTrainingConfiguration.getDebugLongerIterations()));
                computationGraph.addListeners(new TrainingListener[]{SleepyTrainingListener.builder().timerIteration(sharedTrainingConfiguration.getDebugLongerIterations()).build()});
            }
            this.accumulator.markExternalUpdates(true);
            if (numberOfWorkersPerNode > 1) {
                this.wrapper = new ParallelWrapper.Builder(this.originalModel).workers(numberOfWorkersPerNode).workspaceMode(sharedTrainingConfiguration.getWorkspaceMode()).trainingMode(ParallelWrapper.TrainingMode.CUSTOM).gradientsAccumulator(this.accumulator).prefetchBuffer(sharedTrainingConfiguration.getPrefetchSize()).modelParamsSupplier(modelParamsConsumer).updaterParamsSupplier(updaterParamsConsumer).thresholdAlgorithm(sharedTrainingConfiguration.getThresholdAlgorithm()).residualPostProcessor(sharedTrainingConfiguration.getResidualPostProcessor()).build();
                this.wrapper.setExceptionEncountered(this.exceptionEncountered);
            } else {
                log.debug("Using standalone model instead...");
                this.accumulator.fallbackToSingleConsumerMode(true);
                this.accumulator.touch();
                INDArray m9get = modelParamsConsumer.m9get();
                if (m9get != null) {
                    log.info("Updating model params to the most recent ones...");
                    this.originalModel.params().assign(m9get);
                }
                if (computationGraph instanceof ComputationGraph) {
                    this.originalModel.getConfiguration().setTrainingWorkspaceMode(sharedTrainingConfiguration.getWorkspaceMode());
                    this.originalModel.setGradientsAccumulator(this.accumulator);
                } else if (computationGraph instanceof MultiLayerNetwork) {
                    this.originalModel.getLayerWiseConfigurations().setTrainingWorkspaceMode(sharedTrainingConfiguration.getWorkspaceMode());
                    this.originalModel.setGradientsAccumulator(this.accumulator);
                }
            }
        }
        if (this.consumer != null) {
            this.consumer.bypassMode(false);
        }
        if (this.iteratorDS == null && this.iteratorMDS == null) {
            throw new DL4JInvalidConfigException("No iterators were defined for training");
        }
        while (true) {
            try {
                if ((this.iteratorDS == null || !this.iteratorDS.hasNext()) && (this.iteratorMDS == null || !this.iteratorMDS.hasNext())) {
                    break;
                }
                if (this.wrapper != null) {
                    if (this.iteratorDS != null) {
                        this.wrapper.fit(this.iteratorDS);
                    } else {
                        this.wrapper.fit(this.iteratorMDS);
                    }
                } else if (this.iteratorDS != null) {
                    if (computationGraph instanceof ComputationGraph) {
                        this.originalModel.fit(this.iteratorDS);
                    } else if (computationGraph instanceof MultiLayerNetwork) {
                        this.originalModel.fit(this.iteratorDS);
                    }
                } else if (computationGraph instanceof ComputationGraph) {
                    this.originalModel.fit(this.iteratorMDS);
                } else if (computationGraph instanceof MultiLayerNetwork) {
                    this.originalModel.fit(this.iteratorMDS);
                }
                this.consumer.getUpdatesQueue().purge();
            } catch (Throwable th) {
                log.warn("Exception encountered during fit operation", th);
                this.exceptionEncountered.set(true);
                this.exception = th;
            }
        }
        EncodedGradientsAccumulator encodedGradientsAccumulator = this.wrapper != null ? (EncodedGradientsAccumulator) this.wrapper.getGradientsAccumulator() : this.accumulator;
        if (sharedTrainingConfiguration.isEpochReset()) {
            this.wrapper.shutdown();
            this.wrapper = null;
        }
        init();
        this.accumulator.reset();
        if (this.consumer != null) {
            this.consumer.bypassMode(true);
        }
        this.isFirst.set(false);
        log.info("Master thread done...");
        INDArray iNDArray = null;
        if (computationGraph instanceof ComputationGraph) {
            iNDArray = this.originalModel.getUpdater().getUpdaterStateViewArray();
        } else if (computationGraph instanceof MultiLayerNetwork) {
            iNDArray = this.originalModel.getUpdater().getStateViewArray();
        }
        return SharedTrainingResult.builder().aggregationsCount(1).scoreSum(this.originalModel.score()).updaterStateArray(iNDArray).listenerMetaData(new ArrayList()).listenerStaticInfo(new ArrayList()).listenerUpdates(new ArrayList()).minibatchesPerExecutor(Collections.singletonMap(SparkUtils.getSparkExecutorId(), Integer.valueOf(this.iteratorDataSetCount.get().get()))).thresholdAlgorithm(encodedGradientsAccumulator.getHandler().getAverageThresholdAlgorithm()).build();
    }

    public void passDataSet(DataSet dataSet) {
    }

    public void passDataSet(MultiDataSet multiDataSet) {
    }

    public void blockUntilFinished() throws InterruptedException {
        if (this.observer.get() == null) {
            throw new IllegalStateException("This method can't be called before iterators initialization");
        }
        this.observer.get().wait();
    }
}
