package org.keycloak.models.sessions.infinispan.changes;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Retry;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.ModelDuplicateException;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.tracing.TracingProvider;

/* loaded from: input_file:org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.class */
public class PersistentSessionsWorker {
    private static final Logger LOG = Logger.getLogger(PersistentSessionsWorker.class);
    private final KeycloakSessionFactory factory;
    private final ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate;
    private final int maxBatchSize;
    private final List<Thread> threads = new ArrayList();
    private volatile boolean stop;

    /* loaded from: input_file:org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker$BatchWorker.class */
    private class BatchWorker extends Thread {
        private final ArrayBlockingQueue<PersistentUpdate> queue;

        public BatchWorker(ArrayBlockingQueue<PersistentUpdate> arrayBlockingQueue) {
            this.queue = arrayBlockingQueue;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(getClass().getName());
            while (!PersistentSessionsWorker.this.stop) {
                try {
                    process(this.queue);
                } catch (InterruptedException e) {
                    if (!PersistentSessionsWorker.this.stop) {
                        PersistentSessionsWorker.LOG.warn("Caught interrupted exception", e);
                    }
                } catch (RuntimeException e2) {
                    PersistentSessionsWorker.LOG.warn("Exception when processing queue events", e2);
                }
            }
        }

        private void process(ArrayBlockingQueue<PersistentUpdate> arrayBlockingQueue) throws InterruptedException {
            ArrayList arrayList = new ArrayList();
            PersistentUpdate poll = arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
            if (poll != null) {
                arrayList.add(poll);
                arrayBlockingQueue.drainTo(arrayList, PersistentSessionsWorker.this.maxBatchSize - 1);
                KeycloakModelUtils.runJobInTransaction(PersistentSessionsWorker.this.factory, keycloakSession -> {
                    TracingProvider provider = keycloakSession.getProvider(TracingProvider.class);
                    Tracer tracer = provider.getTracer("PersistentSessionsWorker");
                    SpanBuilder spanBuilder = tracer.spanBuilder("PersistentSessionsWorker.process");
                    Stream map = arrayList.stream().map(persistentUpdate -> {
                        return persistentUpdate.getSpan().getSpanContext();
                    });
                    Objects.requireNonNull(spanBuilder);
                    map.forEach(spanBuilder::addLink);
                    Span startSpan = provider.startSpan(spanBuilder);
                    LinkedList linkedList = new LinkedList();
                    try {
                        try {
                            arrayList.forEach(persistentUpdate2 -> {
                                SpanBuilder spanBuilder2 = tracer.spanBuilder("PersistentSessionsWorker.batch");
                                spanBuilder2.setParent(Context.current().with(persistentUpdate2.getSpan()));
                                spanBuilder2.addLink(startSpan.getSpanContext());
                                linkedList.add(spanBuilder2.startSpan());
                            });
                            PersistentSessionsWorker.LOG.debugf("Processing %d deferred session updates.", arrayList.size());
                            Retry.executeWithBackoff(i -> {
                                if (i < 2) {
                                    KeycloakModelUtils.runJobInTransaction(PersistentSessionsWorker.this.factory, keycloakSession -> {
                                        arrayList.forEach(persistentUpdate3 -> {
                                            persistentUpdate3.perform(keycloakSession);
                                        });
                                    });
                                    arrayList.forEach((v0) -> {
                                        v0.complete();
                                    });
                                    return;
                                }
                                PersistentSessionsWorker.LOG.warnf("Running single changes in iteration %d for %d entries", Integer.valueOf(i), Integer.valueOf(arrayList.size()));
                                ArrayList arrayList2 = new ArrayList();
                                ArrayList arrayList3 = new ArrayList();
                                arrayList.forEach(persistentUpdate3 -> {
                                    try {
                                        KeycloakSessionFactory keycloakSessionFactory = PersistentSessionsWorker.this.factory;
                                        Objects.requireNonNull(persistentUpdate3);
                                        KeycloakModelUtils.runJobInTransaction(keycloakSessionFactory, persistentUpdate3::perform);
                                        persistentUpdate3.complete();
                                        arrayList2.add(persistentUpdate3);
                                    } catch (ModelDuplicateException e) {
                                        provider.error(e);
                                        persistentUpdate3.fail(e);
                                        arrayList2.add(persistentUpdate3);
                                    } catch (Throwable th) {
                                        if (i <= 20) {
                                            arrayList3.add(th);
                                            return;
                                        }
                                        provider.error(th);
                                        persistentUpdate3.fail(th);
                                        arrayList2.add(persistentUpdate3);
                                    }
                                });
                                arrayList.removeAll(arrayList2);
                                if (arrayList3.isEmpty()) {
                                    return;
                                }
                                RuntimeException runtimeException = new RuntimeException("unable to complete some changes");
                                Objects.requireNonNull(runtimeException);
                                arrayList3.forEach(runtimeException::addSuppressed);
                                throw runtimeException;
                            }, Duration.of(10L, ChronoUnit.SECONDS), 0);
                            linkedList.forEach((v0) -> {
                                v0.end();
                            });
                            provider.endSpan();
                        } catch (RuntimeException e) {
                            provider.error(e);
                            arrayList.forEach(persistentUpdate3 -> {
                                persistentUpdate3.fail(e);
                            });
                            PersistentSessionsWorker.LOG.warnf(e, "Unable to write %d deferred session updates", Integer.valueOf(arrayList.size()));
                            linkedList.forEach((v0) -> {
                                v0.end();
                            });
                            provider.endSpan();
                        }
                    } catch (Throwable th) {
                        linkedList.forEach((v0) -> {
                            v0.end();
                        });
                        provider.endSpan();
                        throw th;
                    }
                });
            }
        }
    }

    public PersistentSessionsWorker(KeycloakSessionFactory keycloakSessionFactory, ArrayBlockingQueue<PersistentUpdate> arrayBlockingQueue, int i) {
        this.factory = keycloakSessionFactory;
        this.asyncQueuePersistentUpdate = arrayBlockingQueue;
        this.maxBatchSize = i;
    }

    public void start() {
        this.threads.add(new BatchWorker(this.asyncQueuePersistentUpdate));
        this.threads.forEach((v0) -> {
            v0.start();
        });
    }

    public void stop() {
        this.stop = true;
        this.threads.forEach((v0) -> {
            v0.interrupt();
        });
        this.threads.forEach(thread -> {
            try {
                thread.join(TimeUnit.MINUTES.toMillis(1L));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        });
    }
}
