/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.util;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.MapEntries;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.operation.MapOperationProvider;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.exception.RetryableException;
import com.hazelcast.spi.impl.operationservice.impl.operations.PartitionIteratingOperation;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.CollectionUtil;
import com.hazelcast.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class AsyncMapWriter {
    private static final int TRY_COUNT = 10;
    private static final int TRY_PAUSE_MILLIS = 300;
    private final IPartitionService partitionService;
    private final OperationService operationService;
    private final MapService mapService;
    private final SerializationService serializationService;
    private final MapEntries[] outputBuffers;
    private final AtomicInteger numConcurrentOps;
    private final ExecutionService executionService;
    private final ILogger logger;
    private String mapName;
    private MapOperationProvider opProvider;

    public AsyncMapWriter(NodeEngine nodeEngine) {
        this.partitionService = nodeEngine.getPartitionService();
        this.operationService = nodeEngine.getOperationService();
        this.mapService = (MapService)nodeEngine.getService("hz:impl:mapService");
        this.outputBuffers = new MapEntries[this.partitionService.getPartitionCount()];
        this.serializationService = nodeEngine.getSerializationService();
        this.executionService = nodeEngine.getExecutionService();
        this.logger = nodeEngine.getLogger(this.getClass());
        JetService jetService = (JetService)nodeEngine.getService("hz:impl:jetService");
        this.numConcurrentOps = jetService.numConcurrentAsyncOps();
    }

    public void put(Object key, Object value) {
        Object keyData = this.serializationService.toData(key);
        Object valueData = this.serializationService.toData(value);
        int partitionId = this.partitionService.getPartitionId((Data)keyData);
        MapEntries entries = this.outputBuffers[partitionId];
        if (entries == null) {
            entries = this.outputBuffers[partitionId] = new MapEntries();
        }
        entries.add((Data)keyData, (Data)valueData);
    }

    public void setMapName(String mapName) {
        this.mapName = mapName;
        this.opProvider = this.mapService.getMapServiceContext().getMapOperationProvider(mapName);
    }

    public boolean tryFlushAsync(CompletableFuture<Void> completionFuture) {
        Map<Address, List<Integer>> memberPartitionsMap = this.partitionService.getMemberPartitionsMap();
        AtomicInteger pendingOps = new AtomicInteger(0);
        List<PartitionOpBuilder> ops = memberPartitionsMap.entrySet().stream().map(e -> this.opForMember((Address)e.getKey(), (List)e.getValue(), this.outputBuffers)).filter(Objects::nonNull).collect(Collectors.toList());
        if (ops.isEmpty()) {
            completionFuture.complete(null);
            return true;
        }
        if (!this.invokeOnCluster(ops, pendingOps, completionFuture, true)) {
            return false;
        }
        this.resetBuffers();
        return true;
    }

    private boolean tryRetry(int[] partitions, MapEntries[] entriesPerPtion, AtomicInteger pendingOps, CompletableFuture<Void> completionFuture) {
        assert (partitions.length == entriesPerPtion.length);
        HashMap<Address, Map.Entry> addrToEntries = new HashMap<Address, Map.Entry>();
        for (int index = 0; index < partitions.length; ++index) {
            int partition = partitions[index];
            MapEntries entries = entriesPerPtion[index];
            Address owner = this.partitionService.getPartitionOwnerOrWait(partition);
            assert (owner != null) : "null owner was returned";
            Map.Entry ptionsAndEntries = addrToEntries.computeIfAbsent(owner, a -> com.hazelcast.jet.Util.entry(new ArrayList(), new ArrayList()));
            ((List)ptionsAndEntries.getValue()).add(entries);
            ((List)ptionsAndEntries.getKey()).add(partition);
        }
        List<PartitionOpBuilder> retryOps = addrToEntries.entrySet().stream().map(e -> {
            PartitionOpBuilder h = new PartitionOpBuilder((Address)e.getKey());
            List entries = (List)((Map.Entry)e.getValue()).getValue();
            PartitionOpBuilder.access$002(h, entries.toArray(new MapEntries[0]));
            PartitionOpBuilder.access$102(h, CollectionUtil.toIntArray((Collection)((Map.Entry)e.getValue()).getKey()));
            return h;
        }).collect(Collectors.toList());
        return this.invokeOnCluster(retryOps, pendingOps, completionFuture, false);
    }

    private PartitionOpBuilder opForMember(Address member, List<Integer> partitions, MapEntries[] partitionToEntries) {
        PartitionOpBuilder builder = new PartitionOpBuilder(member);
        PartitionOpBuilder.access$002(builder, new MapEntries[partitions.size()]);
        PartitionOpBuilder.access$102(builder, new int[partitions.size()]);
        int index = 0;
        for (Integer partition : partitions) {
            if (partitionToEntries[partition] == null) continue;
            ((PartitionOpBuilder)builder).entries[index] = partitionToEntries[partition];
            ((PartitionOpBuilder)builder).partitions[index] = partition;
            ++index;
        }
        if (index == 0) {
            return null;
        }
        if (index < partitions.size()) {
            PartitionOpBuilder.access$002(builder, Arrays.copyOf(builder.entries, index));
            PartitionOpBuilder.access$102(builder, Arrays.copyOf(builder.partitions, index));
        }
        return builder;
    }

    private void resetBuffers() {
        Arrays.fill(this.outputBuffers, null);
    }

    private boolean invokeOnCluster(List<PartitionOpBuilder> opBuilders, AtomicInteger pendingOps, CompletableFuture<Void> completionFuture, boolean shouldRetry) {
        Preconditions.checkFalse(opBuilders.isEmpty(), "opBuilders is empty");
        if (!Util.tryIncrement(this.numConcurrentOps, opBuilders.size(), 1000)) {
            return false;
        }
        pendingOps.addAndGet(opBuilders.size());
        for (PartitionOpBuilder builder : opBuilders) {
            ExecutionCallback<Object> callback = Util.callbackOf(r -> {
                this.numConcurrentOps.decrementAndGet();
                ArrayList<Integer> failedPartitions = new ArrayList<Integer>();
                ArrayList<MapEntries> failedEntries = new ArrayList<MapEntries>();
                Throwable error = null;
                Object[] results = r.getResults();
                for (int idx = 0; idx < results.length; ++idx) {
                    Object o = results[idx];
                    if (!(o instanceof Throwable)) continue;
                    error = (Throwable)o;
                    if (error instanceof RetryableException) {
                        failedPartitions.add(builder.partitions[idx]);
                        failedEntries.add(builder.entries[idx]);
                        continue;
                    }
                    completionFuture.completeExceptionally((Throwable)o);
                    return;
                }
                if (error != null) {
                    if (!shouldRetry) {
                        completionFuture.completeExceptionally(error);
                        return;
                    }
                    MapEntries[] entries = failedEntries.toArray(new MapEntries[0]);
                    int[] partitions = CollectionUtil.toIntArray(failedPartitions);
                    Throwable originalErr = error;
                    this.executionService.schedule(() -> {
                        try {
                            pendingOps.decrementAndGet();
                            if (!this.tryRetry(partitions, entries, pendingOps, completionFuture)) {
                                completionFuture.completeExceptionally(originalErr);
                            }
                        }
                        catch (Exception e) {
                            this.logger.severe("Exception during retry", e);
                            completionFuture.completeExceptionally(originalErr);
                        }
                    }, 300L, TimeUnit.MILLISECONDS);
                    return;
                }
                if (pendingOps.decrementAndGet() == 0) {
                    completionFuture.complete(null);
                }
            }, throwable -> {
                this.numConcurrentOps.decrementAndGet();
                if (throwable instanceof RetryableException) {
                    if (!this.tryRetry(builder.partitions, builder.entries, pendingOps, completionFuture)) {
                        completionFuture.completeExceptionally((Throwable)throwable);
                    }
                } else {
                    completionFuture.completeExceptionally((Throwable)throwable);
                }
            });
            this.operationService.createInvocationBuilder("hz:impl:mapService", (Operation)builder.build(), builder.address).setTryCount(10).setTryPauseMillis(300L).setExecutionCallback(callback).invoke();
        }
        return true;
    }

    private class PartitionOpBuilder {
        private final Address address;
        private MapEntries[] entries;
        private int[] partitions;

        PartitionOpBuilder(Address address) {
            this.address = address;
        }

        private PartitionIteratingOperation build() {
            OperationFactory factory = AsyncMapWriter.this.opProvider.createPutAllOperationFactory(AsyncMapWriter.this.mapName, this.partitions, this.entries);
            return new PartitionIteratingOperation(factory, this.partitions);
        }

        public String toString() {
            return "PartitionOpBuilder{address=" + this.address + ", entryCount=" + this.entries.length + ", partitions=" + Arrays.toString(this.partitions) + '}';
        }

        static /* synthetic */ MapEntries[] access$002(PartitionOpBuilder x0, MapEntries[] x1) {
            x0.entries = x1;
            return x1;
        }

        static /* synthetic */ int[] access$102(PartitionOpBuilder x0, int[] x1) {
            x0.partitions = x1;
            return x1;
        }
    }
}

