package ca.uhn.fhir.jpa.dao.expunge;

import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ca/uhn/fhir/jpa/dao/expunge/PartitionRunner.class */
public class PartitionRunner {
    private static final Logger ourLog;
    private static final int MAX_POOL_SIZE = 1000;
    private final String myProcessName;
    private final String myThreadPrefix;
    private final int myBatchSize;
    private final int myThreadCount;
    static final /* synthetic */ boolean $assertionsDisabled;

    public PartitionRunner(String str, String str2, int i, int i2) {
        this.myProcessName = str;
        this.myThreadPrefix = str2;
        this.myBatchSize = i;
        this.myThreadCount = i2;
    }

    public void runInPartitionedThreads(List<ResourcePersistentId> list, Consumer<List<ResourcePersistentId>> consumer) {
        List<Callable<Void>> buildCallableTasks = buildCallableTasks(list, consumer);
        if (buildCallableTasks.size() == 0) {
            return;
        }
        if (buildCallableTasks.size() == 1) {
            try {
                buildCallableTasks.get(0).call();
                return;
            } catch (Exception e) {
                ourLog.error("Error while " + this.myProcessName, e);
                throw new InternalErrorException(Msg.code(1084) + e);
            }
        }
        ExecutorService buildExecutor = buildExecutor(buildCallableTasks.size());
        try {
            try {
                Iterator it = buildExecutor.invokeAll(buildCallableTasks).iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                buildExecutor.shutdown();
            } catch (InterruptedException e2) {
                ourLog.error("Interrupted while " + this.myProcessName, e2);
                Thread.currentThread().interrupt();
                buildExecutor.shutdown();
            } catch (ExecutionException e3) {
                ourLog.error("Error while " + this.myProcessName, e3);
                throw new InternalErrorException(Msg.code(1085) + e3);
            }
        } catch (Throwable th) {
            buildExecutor.shutdown();
            throw th;
        }
    }

    private List<Callable<Void>> buildCallableTasks(List<ResourcePersistentId> list, Consumer<List<ResourcePersistentId>> consumer) {
        ArrayList arrayList = new ArrayList();
        if (this.myBatchSize > list.size()) {
            ourLog.info("Splitting batch job of {} entries into chunks of {}", Integer.valueOf(list.size()), Integer.valueOf(this.myBatchSize));
        } else {
            ourLog.info("Creating batch job of {} entries", Integer.valueOf(list.size()));
        }
        for (List list2 : Lists.partition(list, this.myBatchSize)) {
            if (list2.size() > 0) {
                arrayList.add(() -> {
                    ourLog.info(this.myProcessName + " {} resources", Integer.valueOf(list2.size()));
                    consumer.accept(list2);
                    return null;
                });
            }
        }
        return arrayList;
    }

    private ExecutorService buildExecutor(int i) {
        int min = Math.min(i, this.myThreadCount);
        if (!$assertionsDisabled && min <= 0) {
            throw new AssertionError();
        }
        ourLog.info(this.myProcessName + " with {} threads", Integer.valueOf(min));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1000);
        return new ThreadPoolExecutor(min, 1000, 0L, TimeUnit.MILLISECONDS, linkedBlockingQueue, new BasicThreadFactory.Builder().namingPattern(this.myThreadPrefix + "-%d").daemon(false).priority(5).build(), (runnable, threadPoolExecutor) -> {
            ourLog.info("Note: " + this.myThreadPrefix + " executor queue is full ({} elements), waiting for a slot to become available!", Integer.valueOf(linkedBlockingQueue.size()));
            StopWatch stopWatch = new StopWatch();
            try {
                linkedBlockingQueue.put(runnable);
                ourLog.info("Slot become available after {}ms", Long.valueOf(stopWatch.getMillis()));
            } catch (InterruptedException e) {
                throw new RejectedExecutionException(Msg.code(1086) + "Task " + runnable.toString() + " rejected from " + e);
            }
        });
    }

    static {
        $assertionsDisabled = !PartitionRunner.class.desiredAssertionStatus();
        ourLog = LoggerFactory.getLogger(PartitionRunner.class);
    }
}
