package com.espertech.esper.common.internal.epl.dataflow.core;

import com.espertech.esper.common.client.dataflow.core.EPDataFlowCancellationException;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowEmitterOperator;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowExecutionException;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstanceCaptive;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstanceStatistics;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowState;
import com.espertech.esper.common.internal.collection.Pair;
import com.espertech.esper.common.internal.context.aifactory.createdataflow.DataflowDesc;
import com.espertech.esper.common.internal.context.util.AgentInstanceContext;
import com.espertech.esper.common.internal.epl.dataflow.interfaces.DataFlowOpCloseContext;
import com.espertech.esper.common.internal.epl.dataflow.interfaces.DataFlowOpOpenContext;
import com.espertech.esper.common.internal.epl.dataflow.interfaces.DataFlowOperatorLifecycle;
import com.espertech.esper.common.internal.epl.dataflow.realize.OperatorStatisticsProvider;
import com.espertech.esper.common.internal.epl.dataflow.runnables.CompletionListener;
import com.espertech.esper.common.internal.epl.dataflow.runnables.GraphSourceRunnable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/espertech/esper/common/internal/epl/dataflow/core/EPDataFlowInstanceImpl.class */
public class EPDataFlowInstanceImpl implements EPDataFlowInstance {
    private static final Logger log = LoggerFactory.getLogger(EPDataFlowInstanceImpl.class);
    private final Object dataFlowInstanceUserObject;
    private final String dataFlowInstanceId;
    private final OperatorStatisticsProvider statistics;
    private final DataflowDesc dataflowDesc;
    private final AgentInstanceContext agentInstanceContext;
    private final Map<Integer, Pair<Object, Boolean>> operators;
    private final List<GraphSourceRunnable> sourceRunnables;
    private final EPDataFlowInstanceStatistics statisticsProvider;
    private final Map<String, Object> parametersURIs;
    private EPDataFlowState state;
    private List<Thread> threads;
    private List<CountDownLatch> joinedThreadLatches;
    private Thread runCurrentThread;

    public EPDataFlowInstanceImpl(Object obj, String str, OperatorStatisticsProvider operatorStatisticsProvider, Map<Integer, Object> map, List<GraphSourceRunnable> list, DataflowDesc dataflowDesc, AgentInstanceContext agentInstanceContext, EPDataFlowInstanceStatistics ePDataFlowInstanceStatistics, Map<String, Object> map2) {
        this.dataFlowInstanceUserObject = obj;
        this.dataFlowInstanceId = str;
        this.statistics = operatorStatisticsProvider;
        this.dataflowDesc = dataflowDesc;
        this.agentInstanceContext = agentInstanceContext;
        this.sourceRunnables = list;
        this.statisticsProvider = ePDataFlowInstanceStatistics;
        this.parametersURIs = map2;
        setState(EPDataFlowState.INSTANTIATED);
        this.operators = new TreeMap();
        for (Map.Entry<Integer, Object> entry : map.entrySet()) {
            this.operators.put(entry.getKey(), new Pair<>(entry.getValue(), false));
        }
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public String getDataFlowName() {
        return this.dataflowDesc.getDataflowName();
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public String getDataFlowDeploymentId() {
        return this.dataflowDesc.getStatementContext().getDeploymentId();
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public EPDataFlowState getState() {
        return this.state;
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public synchronized void run() throws IllegalStateException, EPDataFlowExecutionException, EPDataFlowCancellationException {
        checkExecCompleteState();
        checkExecCancelledState();
        checkExecRunningState();
        String dataflowName = this.dataflowDesc.getDataflowName();
        if (this.sourceRunnables.size() != 1) {
            throw new UnsupportedOperationException("The data flow '" + dataflowName + "' has zero or multiple sources and requires the use of the start method instead");
        }
        callOperatorOpen();
        GraphSourceRunnable graphSourceRunnable = this.sourceRunnables.get(0);
        setState(EPDataFlowState.RUNNING);
        this.runCurrentThread = Thread.currentThread();
        try {
            graphSourceRunnable.runSync();
            callOperatorClose();
            if (this.state != EPDataFlowState.CANCELLED) {
                setState(EPDataFlowState.COMPLETE);
            }
        } catch (InterruptedException e) {
            callOperatorClose();
            setState(EPDataFlowState.CANCELLED);
            throw new EPDataFlowCancellationException("Data flow '" + dataflowName + "' execution was cancelled", dataflowName);
        } catch (Throwable th) {
            callOperatorClose();
            setState(EPDataFlowState.COMPLETE);
            throw new EPDataFlowExecutionException("Exception encountered running data flow '" + dataflowName + "': " + th.getMessage(), th, dataflowName);
        }
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public void start() throws IllegalStateException {
        checkExecCompleteState();
        checkExecCancelledState();
        checkExecRunningState();
        callOperatorOpen();
        final AtomicInteger atomicInteger = new AtomicInteger(this.sourceRunnables.size());
        this.threads = new ArrayList();
        for (int i = 0; i < this.sourceRunnables.size(); i++) {
            GraphSourceRunnable graphSourceRunnable = this.sourceRunnables.get(i);
            Thread thread = new Thread(graphSourceRunnable, "esper." + this.dataflowDesc.getDataflowName() + "-" + i);
            thread.setContextClassLoader(this.agentInstanceContext.getClasspathImportServiceRuntime().getClassLoader());
            thread.setDaemon(true);
            graphSourceRunnable.addCompletionListener(new CompletionListener() { // from class: com.espertech.esper.common.internal.epl.dataflow.core.EPDataFlowInstanceImpl.1
                @Override // com.espertech.esper.common.internal.epl.dataflow.runnables.CompletionListener
                public void completed() {
                    if (atomicInteger.decrementAndGet() == 0) {
                        EPDataFlowInstanceImpl.this.completed();
                    }
                }
            });
            this.threads.add(thread);
            thread.start();
        }
        setState(EPDataFlowState.RUNNING);
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public synchronized EPDataFlowInstanceCaptive startCaptive() {
        checkExecCompleteState();
        checkExecCancelledState();
        checkExecRunningState();
        setState(EPDataFlowState.RUNNING);
        callOperatorOpen();
        HashMap hashMap = new HashMap();
        for (Pair<Object, Boolean> pair : this.operators.values()) {
            if (pair.getFirst() instanceof EPDataFlowEmitterOperator) {
                EPDataFlowEmitterOperator ePDataFlowEmitterOperator = (EPDataFlowEmitterOperator) pair.getFirst();
                hashMap.put(ePDataFlowEmitterOperator.getName(), ePDataFlowEmitterOperator);
            }
        }
        return new EPDataFlowInstanceCaptive(hashMap, this.sourceRunnables);
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public void join() throws IllegalStateException, InterruptedException {
        String dataflowName = this.dataflowDesc.getDataflowName();
        if (this.state == EPDataFlowState.INSTANTIATED) {
            throw new IllegalStateException("Data flow '" + dataflowName + "' instance has not been executed, please use join after start or run");
        }
        if (this.state == EPDataFlowState.CANCELLED) {
            throw new IllegalStateException("Data flow '" + dataflowName + "' instance has been cancelled and cannot be joined");
        }
        if (this.threads != null) {
            Iterator<Thread> it = this.threads.iterator();
            while (it.hasNext()) {
                it.next().join();
            }
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        synchronized (this) {
            if (this.joinedThreadLatches == null) {
                this.joinedThreadLatches = new ArrayList();
            }
            this.joinedThreadLatches.add(countDownLatch);
        }
        if (this.state != EPDataFlowState.COMPLETE) {
            countDownLatch.await();
        }
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public void cancel() {
        if (this.state == EPDataFlowState.COMPLETE || this.state == EPDataFlowState.CANCELLED) {
            return;
        }
        if (this.state == EPDataFlowState.INSTANTIATED) {
            setState(EPDataFlowState.CANCELLED);
            this.sourceRunnables.clear();
            callOperatorClose();
            return;
        }
        if (this.threads != null) {
            Iterator<GraphSourceRunnable> it = this.sourceRunnables.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            for (Thread thread : this.threads) {
                if (thread.isAlive() && !thread.isInterrupted()) {
                    thread.interrupt();
                }
            }
        } else {
            if (this.runCurrentThread != null) {
                this.runCurrentThread.interrupt();
            }
            this.runCurrentThread = null;
        }
        callOperatorClose();
        setState(EPDataFlowState.CANCELLED);
        this.sourceRunnables.clear();
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public EPDataFlowInstanceStatistics getStatistics() {
        return this.statisticsProvider;
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public Object getUserObject() {
        return this.dataFlowInstanceUserObject;
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public String getInstanceId() {
        return this.dataFlowInstanceId;
    }

    @Override // com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance
    public Map<String, Object> getParameters() {
        return this.parametersURIs;
    }

    public synchronized void completed() {
        if (this.state != EPDataFlowState.CANCELLED) {
            setState(EPDataFlowState.COMPLETE);
        }
        callOperatorClose();
        if (this.joinedThreadLatches != null) {
            Iterator<CountDownLatch> it = this.joinedThreadLatches.iterator();
            while (it.hasNext()) {
                it.next().countDown();
            }
        }
    }

    private void callOperatorOpen() {
        for (Integer num : this.dataflowDesc.getOperatorBuildOrder()) {
            Pair<Object, Boolean> pair = this.operators.get(num);
            if (pair.getFirst() instanceof DataFlowOperatorLifecycle) {
                try {
                    ((DataFlowOperatorLifecycle) pair.getFirst()).open(new DataFlowOpOpenContext(num.intValue()));
                } catch (RuntimeException e) {
                    throw new EPDataFlowExecutionException("Exception encountered opening data flow 'FlowOne' in operator " + pair.getFirst().getClass().getSimpleName() + ": " + e.getMessage(), e, this.dataflowDesc.getDataflowName());
                }
            }
        }
    }

    private synchronized void callOperatorClose() {
        for (Integer num : this.dataflowDesc.getOperatorBuildOrder()) {
            Pair<Object, Boolean> pair = this.operators.get(num);
            if ((pair.getFirst() instanceof DataFlowOperatorLifecycle) && !pair.getSecond().booleanValue()) {
                try {
                    ((DataFlowOperatorLifecycle) pair.getFirst()).close(new DataFlowOpCloseContext(num.intValue()));
                } catch (RuntimeException e) {
                    log.error("Exception encountered closing data flow '" + this.dataflowDesc.getDataflowName() + "': " + e.getMessage(), e);
                }
                pair.setSecond(true);
            }
        }
    }

    private void checkExecCompleteState() {
        if (this.state == EPDataFlowState.COMPLETE) {
            throw new IllegalStateException("Data flow '" + this.dataflowDesc.getDataflowName() + "' instance has already completed, please use instantiate to run the data flow again");
        }
    }

    private void checkExecRunningState() {
        if (this.state == EPDataFlowState.RUNNING) {
            throw new IllegalStateException("Data flow '" + this.dataflowDesc.getDataflowName() + "' instance is already running");
        }
    }

    private void checkExecCancelledState() {
        if (this.state == EPDataFlowState.CANCELLED) {
            throw new IllegalStateException("Data flow '" + this.dataflowDesc.getDataflowName() + "' instance has been cancelled and cannot be run or started");
        }
    }

    private void setState(EPDataFlowState ePDataFlowState) {
        this.agentInstanceContext.getAuditProvider().dataflowTransition(this.dataflowDesc.getDataflowName(), this.dataFlowInstanceId, this.state, ePDataFlowState, this.agentInstanceContext);
        this.state = ePDataFlowState;
    }
}
