package shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import shaded.org.apache.zeppelin.com.google.common.base.MoreObjects;
import shaded.org.apache.zeppelin.com.google.common.base.Throwables;
import shaded.org.apache.zeppelin.com.google.common.collect.ImmutableList;
import shaded.org.apache.zeppelin.com.google.common.collect.Maps;
import shaded.org.apache.zeppelin.com.google.common.collect.Queues;
import shaded.org.apache.zeppelin.com.google.common.collect.Sets;
import shaded.org.apache.zeppelin.io.atomix.core.workqueue.Task;
import shaded.org.apache.zeppelin.io.atomix.core.workqueue.WorkQueueStats;
import shaded.org.apache.zeppelin.io.atomix.core.workqueue.WorkQueueType;
import shaded.org.apache.zeppelin.io.atomix.primitive.service.AbstractPrimitiveService;
import shaded.org.apache.zeppelin.io.atomix.primitive.service.BackupInput;
import shaded.org.apache.zeppelin.io.atomix.primitive.service.BackupOutput;
import shaded.org.apache.zeppelin.io.atomix.primitive.session.Session;
import shaded.org.apache.zeppelin.io.atomix.primitive.session.SessionId;
import shaded.org.apache.zeppelin.io.atomix.utils.serializer.Namespace;
import shaded.org.apache.zeppelin.io.atomix.utils.serializer.Serializer;

/* loaded from: input_file:shaded/org/apache/zeppelin/io/atomix/core/workqueue/impl/DefaultWorkQueueService.class */
public class DefaultWorkQueueService extends AbstractPrimitiveService<WorkQueueClient> implements WorkQueueService {
    private static final Serializer SERIALIZER = Serializer.using(Namespace.builder().register(WorkQueueType.instance().namespace()).register(TaskAssignment.class).register(new HashMap().keySet().getClass()).register(ArrayDeque.class).register(SessionId.class).build());
    private final AtomicLong totalCompleted;
    private Queue<Task<byte[]>> unassignedTasks;
    private Map<String, TaskAssignment> assignments;
    private Set<SessionId> registeredWorkers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:shaded/org/apache/zeppelin/io/atomix/core/workqueue/impl/DefaultWorkQueueService$TaskAssignment.class */
    public static class TaskAssignment {
        private final long sessionId;
        private final Task<byte[]> task;

        public TaskAssignment(long j, Task<byte[]> task) {
            this.sessionId = j;
            this.task = task;
        }

        public long sessionId() {
            return this.sessionId;
        }

        public Task<byte[]> task() {
            return this.task;
        }

        public String toString() {
            return MoreObjects.toStringHelper(getClass()).add("sessionId", this.sessionId).add("task", this.task).toString();
        }
    }

    public DefaultWorkQueueService() {
        super(WorkQueueType.instance(), WorkQueueClient.class);
        this.totalCompleted = new AtomicLong(0L);
        this.unassignedTasks = Queues.newArrayDeque();
        this.assignments = Maps.newHashMap();
        this.registeredWorkers = Sets.newLinkedHashSet();
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.primitive.service.AbstractPrimitiveService, shaded.org.apache.zeppelin.io.atomix.primitive.service.PrimitiveService
    public Serializer serializer() {
        return SERIALIZER;
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.primitive.service.PrimitiveService
    public void backup(BackupOutput backupOutput) {
        backupOutput.writeObject(this.registeredWorkers);
        backupOutput.writeObject(this.assignments);
        backupOutput.writeObject(this.unassignedTasks);
        backupOutput.writeLong(this.totalCompleted.get());
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.primitive.service.PrimitiveService
    public void restore(BackupInput backupInput) {
        this.registeredWorkers = (Set) backupInput.readObject();
        this.assignments = (Map) backupInput.readObject();
        this.unassignedTasks = (Queue) backupInput.readObject();
        this.totalCompleted.set(backupInput.readLong());
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public WorkQueueStats stats() {
        return WorkQueueStats.builder().withTotalCompleted(this.totalCompleted.get()).withTotalPending(this.unassignedTasks.size()).withTotalInProgress(this.assignments.size()).build();
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public void clear() {
        this.unassignedTasks.clear();
        this.assignments.clear();
        this.registeredWorkers.clear();
        this.totalCompleted.set(0L);
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public void register() {
        this.registeredWorkers.add(getCurrentSession().sessionId());
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public void unregister() {
        this.registeredWorkers.remove(getCurrentSession().sessionId());
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public void add(Collection<byte[]> collection) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        collection.forEach(bArr -> {
            this.unassignedTasks.add(new Task<>(String.format("%d:%d:%d", getCurrentSession().sessionId().id(), Long.valueOf(getCurrentIndex()), Integer.valueOf(atomicInteger.getAndIncrement())), bArr));
        });
        this.registeredWorkers.forEach(sessionId -> {
            getSession(sessionId).accept(workQueueClient -> {
                workQueueClient.taskAvailable();
            });
        });
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public Collection<Task<byte[]>> take(int i) {
        try {
            if (this.unassignedTasks.isEmpty()) {
                return ImmutableList.of();
            }
            long longValue = getCurrentSession().sessionId().id().longValue();
            return (Collection) IntStream.range(0, Math.min(i, this.unassignedTasks.size())).mapToObj(i2 -> {
                Task<byte[]> poll = this.unassignedTasks.poll();
                this.assignments.put(poll.taskId(), new TaskAssignment(longValue, poll));
                return poll;
            }).collect(Collectors.toCollection(ArrayList::new));
        } catch (Exception e) {
            getLogger().warn("State machine update failed", e);
            throw Throwables.propagate(e);
        }
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.core.workqueue.impl.WorkQueueService
    public void complete(Collection<String> collection) {
        try {
            collection.forEach(str -> {
                TaskAssignment taskAssignment = this.assignments.get(str);
                if (taskAssignment == null || taskAssignment.sessionId() != getCurrentSession().sessionId().id().longValue()) {
                    return;
                }
                this.assignments.remove(str);
                this.totalCompleted.incrementAndGet();
            });
        } catch (Exception e) {
            getLogger().warn("State machine update failed", e);
            throw Throwables.propagate(e);
        }
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.primitive.service.AbstractPrimitiveService
    public void onExpire(Session session) {
        evictWorker(session.sessionId());
    }

    @Override // shaded.org.apache.zeppelin.io.atomix.primitive.service.AbstractPrimitiveService
    public void onClose(Session session) {
        evictWorker(session.sessionId());
    }

    private void evictWorker(SessionId sessionId) {
        this.registeredWorkers.remove(sessionId);
        Iterator<Map.Entry<String, TaskAssignment>> it = this.assignments.entrySet().iterator();
        while (it.hasNext()) {
            TaskAssignment value = it.next().getValue();
            if (value.sessionId() == sessionId.id().longValue()) {
                this.unassignedTasks.add(value.task());
                it.remove();
            }
        }
    }
}
