/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.util;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.core.PartitionAware;
import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriter;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Bits;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;

public class AsyncSnapshotWriterImpl
implements AsyncSnapshotWriter {
    private static final int DEFAULT_CHUNK_SIZE = 131072;
    final int usableChunkSize;
    final byte[] serializedByteArrayHeader = new byte[12];
    final byte[] valueTerminator;
    final AtomicInteger numConcurrentAsyncOps;
    private final IPartitionService partitionService;
    private final CustomByteArrayOutputStream[] buffers;
    private final int[] partitionKeys;
    private int partitionSequence;
    private final ILogger logger;
    private final NodeEngine nodeEngine;
    private final boolean useBigEndian;
    private final int memberCount;
    private Supplier<IMap<SnapshotDataKey, byte[]>> currentMap;
    private final AtomicReference<Throwable> lastError = new AtomicReference();
    private final AtomicInteger numActiveFlushes = new AtomicInteger();
    private long totalKeys;
    private long totalChunks;
    private long totalPayloadBytes;
    private final ExecutionCallback<Object> callback = new ExecutionCallback<Object>(){

        @Override
        public void onResponse(Object response) {
            assert (response == null) : "put operation overwrote a previous value: " + response;
            AsyncSnapshotWriterImpl.this.numActiveFlushes.decrementAndGet();
            AsyncSnapshotWriterImpl.this.numConcurrentAsyncOps.decrementAndGet();
        }

        @Override
        public void onFailure(Throwable t) {
            AsyncSnapshotWriterImpl.this.logger.severe("Error writing to snapshot map '" + ((IMap)AsyncSnapshotWriterImpl.this.currentMap.get()).getName() + "'", t);
            AsyncSnapshotWriterImpl.this.lastError.compareAndSet(null, t);
            AsyncSnapshotWriterImpl.this.numActiveFlushes.decrementAndGet();
            AsyncSnapshotWriterImpl.this.numConcurrentAsyncOps.decrementAndGet();
        }
    };

    public AsyncSnapshotWriterImpl(NodeEngine nodeEngine, int memberIndex, int memberCount) {
        this(131072, nodeEngine, memberIndex, memberCount);
    }

    AsyncSnapshotWriterImpl(int chunkSize, NodeEngine nodeEngine, int memberIndex, int memberCount) {
        this.nodeEngine = nodeEngine;
        this.partitionService = nodeEngine.getPartitionService();
        this.logger = nodeEngine.getLogger(this.getClass());
        this.memberCount = memberCount;
        this.useBigEndian = !nodeEngine.getHazelcastInstance().getConfig().getSerializationConfig().isUseNativeByteOrder() || ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
        Bits.writeInt(this.serializedByteArrayHeader, 4, -12, this.useBigEndian);
        this.buffers = new CustomByteArrayOutputStream[this.partitionService.getPartitionCount()];
        for (int i = 0; i < this.buffers.length; ++i) {
            this.buffers[i] = new CustomByteArrayOutputStream(chunkSize);
            this.buffers[i].write(this.serializedByteArrayHeader, 0, this.serializedByteArrayHeader.length);
        }
        JetService jetService = (JetService)nodeEngine.getService("hz:impl:jetService");
        this.partitionKeys = jetService.getSharedPartitionKeys();
        this.partitionSequence = memberIndex;
        this.numConcurrentAsyncOps = jetService.numConcurrentAsyncOps();
        byte[] valueTerminatorWithHeader = nodeEngine.getSerializationService().toData(SnapshotDataValueTerminator.INSTANCE).toByteArray();
        this.valueTerminator = Arrays.copyOfRange(valueTerminatorWithHeader, 4, valueTerminatorWithHeader.length);
        this.usableChunkSize = chunkSize - this.valueTerminator.length;
    }

    @Override
    public void setCurrentMap(String mapName) {
        assert (this.isEmpty()) : "writer not empty";
        if (this.currentMap != null && this.logger.isFineEnabled()) {
            this.logger.fine(String.format("Stats for %s: keys=%,d, chunks=%,d, bytes=%,d", this.currentMap.get().getName(), this.totalKeys, this.totalChunks, this.totalPayloadBytes));
        }
        this.currentMap = Util.memoize(() -> this.nodeEngine.getHazelcastInstance().getMap(mapName));
        this.totalPayloadBytes = 0L;
        this.totalChunks = 0L;
        this.totalKeys = 0L;
    }

    @Override
    @CheckReturnValue
    public boolean offer(Map.Entry<? extends Data, ? extends Data> entry) {
        int partitionId = this.partitionService.getPartitionId(entry.getKey());
        int length = entry.getKey().totalSize() + entry.getValue().totalSize() - 8;
        if (length > this.usableChunkSize) {
            return this.putAsyncToMap(partitionId, () -> {
                byte[] data = new byte[this.serializedByteArrayHeader.length + length + this.valueTerminator.length];
                int offset = 0;
                System.arraycopy(this.serializedByteArrayHeader, 0, data, offset, this.serializedByteArrayHeader.length);
                Bits.writeInt(data, offset += this.serializedByteArrayHeader.length - 4, length + this.valueTerminator.length, this.useBigEndian);
                this.copyWithoutHeader((Data)entry.getKey(), data, offset += 4);
                this.copyWithoutHeader((Data)entry.getValue(), data, offset += ((Data)entry.getKey()).totalSize() - 4);
                System.arraycopy(this.valueTerminator, 0, data, length, this.valueTerminator.length);
                return new HeapData(data);
            });
        }
        if (this.buffers[partitionId].size() + length > this.usableChunkSize && !this.flush(partitionId)) {
            return false;
        }
        this.writeWithoutHeader(entry.getKey(), this.buffers[partitionId]);
        this.writeWithoutHeader(entry.getValue(), this.buffers[partitionId]);
        ++this.totalKeys;
        return true;
    }

    private void copyWithoutHeader(Data src, byte[] dst, int dstOffset) {
        byte[] bytes = src.toByteArray();
        System.arraycopy(bytes, 4, dst, dstOffset, bytes.length - 4);
    }

    private void writeWithoutHeader(Data src, OutputStream dst) {
        byte[] bytes = src.toByteArray();
        try {
            dst.write(bytes, 4, bytes.length - 4);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @CheckReturnValue
    private boolean flush(int partitionId) {
        return this.containsOnlyHeader(this.buffers[partitionId]) || this.putAsyncToMap(partitionId, () -> this.getBufferContentsAndClear(this.buffers[partitionId]));
    }

    private boolean containsOnlyHeader(CustomByteArrayOutputStream buffer) {
        return buffer.size() == this.serializedByteArrayHeader.length;
    }

    private Data getBufferContentsAndClear(CustomByteArrayOutputStream buffer) {
        buffer.write(this.valueTerminator, 0, this.valueTerminator.length);
        byte[] data = buffer.toByteArray();
        this.updateSerializedBytesLength(data);
        this.totalPayloadBytes += (long)buffer.size;
        ++this.totalChunks;
        buffer.reset();
        buffer.write(this.serializedByteArrayHeader, 0, this.serializedByteArrayHeader.length);
        return new HeapData(data);
    }

    private void updateSerializedBytesLength(byte[] data) {
        Bits.writeInt(data, 8, data.length - this.serializedByteArrayHeader.length, this.useBigEndian);
    }

    @CheckReturnValue
    private boolean putAsyncToMap(int partitionId, Supplier<Data> dataSupplier) {
        if (!Util.tryIncrement(this.numConcurrentAsyncOps, 1, 1000)) {
            return false;
        }
        try {
            ICompletableFuture<byte[]> future = this.currentMap.get().putAsync(new SnapshotDataKey(this.partitionKeys[partitionId], this.partitionSequence), (byte[])dataSupplier.get());
            this.partitionSequence += this.memberCount;
            future.andThen(this.callback);
            this.numActiveFlushes.incrementAndGet();
        }
        catch (HazelcastInstanceNotActiveException ignored) {
            return false;
        }
        return true;
    }

    @Override
    @CheckReturnValue
    public boolean flush() {
        for (int i = 0; i < this.buffers.length; ++i) {
            if (this.flush(i)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean hasPendingAsyncOps() {
        return this.numActiveFlushes.get() > 0;
    }

    @Override
    public Throwable getError() {
        return this.lastError.getAndSet(null);
    }

    @Override
    public boolean isEmpty() {
        return this.numActiveFlushes.get() == 0 && Arrays.stream(this.buffers).allMatch(this::containsOnlyHeader);
    }

    int partitionKey(int partitionId) {
        return this.partitionKeys[partitionId];
    }

    @Override
    public long getTotalPayloadBytes() {
        return this.totalPayloadBytes;
    }

    @Override
    public long getTotalKeys() {
        return this.totalKeys;
    }

    @Override
    public long getTotalChunks() {
        return this.totalChunks;
    }

    static class CustomByteArrayOutputStream
    extends OutputStream {
        private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
        private byte[] data;
        private int size;
        private int capacityLimit;

        CustomByteArrayOutputStream(int capacityLimit) {
            this.capacityLimit = capacityLimit;
            this.data = EMPTY_BYTE_ARRAY;
        }

        @Override
        public void write(int b) {
            this.ensureCapacity(this.size + 1);
            this.data[this.size] = (byte)b;
            ++this.size;
        }

        @Override
        public void write(@Nonnull byte[] b, int off, int len) {
            if (off < 0 || off > b.length || len < 0 || off + len - b.length > 0) {
                throw new IndexOutOfBoundsException("off=" + off + ", len=" + len);
            }
            this.ensureCapacity(this.size + len);
            System.arraycopy(b, off, this.data, this.size, len);
            this.size += len;
        }

        private void ensureCapacity(int minCapacity) {
            if (minCapacity - this.data.length > 0) {
                int newCapacity = this.data.length;
                while ((newCapacity = Math.max(1, newCapacity << 1)) - minCapacity < 0) {
                }
                if (newCapacity - this.capacityLimit > 0) {
                    throw new IllegalStateException("buffer full");
                }
                this.data = Arrays.copyOf(this.data, newCapacity);
            }
        }

        void reset() {
            this.size = 0;
        }

        @Nonnull
        byte[] toByteArray() {
            return Arrays.copyOf(this.data, this.size);
        }

        int size() {
            return this.size;
        }
    }

    public static final class SnapshotDataValueTerminator
    implements IdentifiedDataSerializable {
        public static final IdentifiedDataSerializable INSTANCE = new SnapshotDataValueTerminator();

        private SnapshotDataValueTerminator() {
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getId() {
            return 28;
        }

        @Override
        public void writeData(ObjectDataOutput out) {
        }

        @Override
        public void readData(ObjectDataInput in) {
        }
    }

    public static final class SnapshotDataKey
    implements IdentifiedDataSerializable,
    PartitionAware {
        int partitionKey;
        int sequence;

        public SnapshotDataKey() {
        }

        SnapshotDataKey(int partitionKey, int sequence) {
            this.partitionKey = partitionKey;
            this.sequence = sequence;
        }

        public Object getPartitionKey() {
            return this.partitionKey;
        }

        public String toString() {
            return "SnapshotDataKey{partitionKey=" + this.partitionKey + ", sequence=" + this.sequence + '}';
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getId() {
            return 27;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.partitionKey);
            out.writeInt(this.sequence);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.partitionKey = in.readInt();
            this.sequence = in.readInt();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SnapshotDataKey that = (SnapshotDataKey)o;
            return this.partitionKey == that.partitionKey && this.sequence == that.sequence;
        }

        public int hashCode() {
            return Objects.hash(this.partitionKey, this.sequence);
        }
    }
}

