/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.metastorage.persistence;

import java.io.Serializable;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.LongStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageListener;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageClusterNodeData;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryCache;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageJoiningNodeData;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageKeyValuePair;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion;
import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
import org.apache.ignite.internal.processors.metastorage.persistence.DmsLocalMetaStorageLock;
import org.apache.ignite.internal.processors.metastorage.persistence.InMemoryCachedDistributedMetaStorageBridge;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProducer;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class DistributedMetaStorageImpl
extends GridProcessorAdapter
implements DistributedMetaStorage,
IgniteChangeGlobalStateSupport {
    private static final int COMPONENT_ID = GridComponent.DiscoveryDataExchangeType.META_STORAGE.ordinal();
    private static final long DFLT_MAX_HISTORY_BYTES = 0x6400000L;
    public static final String NOT_SUPPORTED_MSG = "Ignite cluster has nodes that don't support distributed metastorage feature. Writing cannot be completed.";
    private final boolean isClient;
    private final boolean isPersistenceEnabled;
    private final GridInternalSubscriptionProcessor isp;
    private volatile InMemoryCachedDistributedMetaStorageBridge bridge;
    private volatile DistributedMetaStorageVersion ver;
    final List<IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>>> lsnrs = new CopyOnWriteArrayList<IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>>>();
    private final DistributedMetaStorageHistoryCache histCache = new DistributedMetaStorageHistoryCache();
    private final long histMaxBytes = IgniteSystemProperties.getLong("IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES", 0x6400000L);
    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<UUID, GridFutureAdapter<Boolean>>();
    private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock();
    private boolean stopped;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final JdkMarshaller marshaller;
    private final DmsDataWriterWorker worker;

    public DistributedMetaStorageImpl(GridKernalContext ctx) {
        super(ctx);
        this.isClient = ctx.clientNode();
        this.isPersistenceEnabled = !this.isClient && GridCacheUtils.isPersistenceEnabled(ctx.config());
        this.isp = ctx.internalSubscriptionProcessor();
        this.marshaller = ctx.marshallerContext().jdkMarshaller();
        this.bridge = new InMemoryCachedDistributedMetaStorageBridge(this.marshaller);
        this.worker = !this.isPersistenceEnabled ? null : new DmsDataWriterWorker(ctx.igniteInstanceName(), this.log, new DmsLocalMetaStorageLock(){

            @Override
            public void lock() {
                DistributedMetaStorageImpl.this.localMetastorageLock();
            }

            @Override
            public void unlock() {
                DistributedMetaStorageImpl.this.localMetastorageUnlock();
            }
        }, this::criticalError);
        ctx.discovery().localJoinFuture().listen(this::notifyReadyForWrite);
    }

    @Override
    public void start() throws IgniteCheckedException {
        if (this.ctx.isDaemon()) {
            return;
        }
        if (!this.isPersistenceEnabled) {
            this.ver = DistributedMetaStorageVersion.INITIAL_VERSION;
        } else {
            this.isp.registerMetastorageListener(new MetastorageLifecycleListener(){

                @Override
                public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
                    DistributedMetaStorageImpl.this.onMetaStorageReadyForRead(metastorage);
                }

                @Override
                public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
                    DistributedMetaStorageImpl.this.onMetaStorageReadyForWrite(metastorage);
                }
            });
        }
        GridDiscoveryManager discovery = this.ctx.discovery();
        discovery.setCustomEventListener(DistributedMetaStorageUpdateMessage.class, this::onUpdateMessage);
        discovery.setCustomEventListener(DistributedMetaStorageUpdateAckMessage.class, this::onAckMessage);
    }

    @Override
    public void onKernalStop(boolean cancel) {
        this.lock.writeLock().lock();
        try {
            this.stopWorker(cancel);
        }
        finally {
            this.lock.writeLock().unlock();
            this.cancelUpdateFutures(DistributedMetaStorageImpl.nodeStoppingException(), true);
        }
    }

    private void stopWorker(boolean cancel) {
        assert (this.lock.isWriteLockedByCurrentThread());
        if (this.isPersistenceEnabled) {
            try {
                this.worker.cancel(cancel);
            }
            catch (InterruptedException e) {
                this.log.error("Cannot stop distributed metastorage worker.", e);
            }
        }
    }

    public void inMemoryReadyForRead() {
        if (!this.isPersistenceEnabled) {
            this.notifyReadyForRead();
        }
    }

    private void notifyReadyForRead() {
        for (DistributedMetastorageLifecycleListener subscriber : this.isp.getDistributedMetastorageSubscribers()) {
            subscriber.onReadyForRead(this);
        }
    }

    private void notifyReadyForWrite(IgniteInternalFuture<DiscoveryLocalJoinData> fut) {
        if (fut.error() == null) {
            for (DistributedMetastorageLifecycleListener subscriber : this.isp.getDistributedMetastorageSubscribers()) {
                subscriber.onReadyForWrite(this);
            }
        }
    }

    @Override
    public void onActivate(GridKernalContext kctx) {
    }

    @Override
    public void onDeActivate(GridKernalContext kctx) {
        if (this.isClient) {
            return;
        }
        this.lock.writeLock().lock();
        try {
            this.stopWorker(false);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean isActive() {
        return this.ctx.state().clusterState().active();
    }

    private void onMetaStorageReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
        assert (this.isPersistenceEnabled);
        this.localMetastorageLock();
        try {
            this.lock.writeLock().lock();
            try {
                this.ver = this.bridge.readInitialData(metastorage);
                metastorage.iterate(DistributedMetaStorageUtil.historyItemPrefix(), (key, val) -> this.addToHistoryCache(DistributedMetaStorageUtil.historyItemVer(key), (DistributedMetaStorageHistoryItem)val), true);
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        finally {
            this.localMetastorageUnlock();
        }
        this.notifyReadyForRead();
    }

    private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
        assert (this.isPersistenceEnabled);
        this.worker.setMetaStorage(metastorage);
        IgniteThread workerThread = new IgniteThread(this.ctx.igniteInstanceName(), "dms-writer-thread", this.worker);
        workerThread.start();
    }

    @Override
    public long getUpdatesCount() {
        return this.ver.id;
    }

    @Override
    @Nullable
    public <T extends Serializable> T read(@NotNull String key) throws IgniteCheckedException {
        this.lock.readLock().lock();
        try {
            Serializable serializable = this.bridge.read(key);
            return (T)serializable;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
        assert (val != null) : key;
        try {
            this.startWrite(key, DistributedMetaStorageUtil.marshal(this.marshaller, val)).get();
        }
        catch (IgniteCheckedException ex) {
            throw new IgniteCheckedException("Write was failed", ex);
        }
    }

    @Override
    public GridFutureAdapter<?> writeAsync(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
        assert (val != null) : key;
        return this.startWrite(key, DistributedMetaStorageUtil.marshal(this.marshaller, val));
    }

    @Override
    public GridFutureAdapter<?> removeAsync(@NotNull String key) throws IgniteCheckedException {
        return this.startWrite(key, null);
    }

    @Override
    public void remove(@NotNull String key) throws IgniteCheckedException {
        this.startWrite(key, null).get();
    }

    @Override
    public boolean compareAndSet(@NotNull String key, @Nullable Serializable expVal, @NotNull Serializable newVal) throws IgniteCheckedException {
        assert (newVal != null) : key;
        try {
            return this.compareAndSetAsync(key, expVal, newVal).get();
        }
        catch (IgniteCheckedException ex) {
            throw new IgniteCheckedException("Write was failed", ex);
        }
    }

    @Override
    public GridFutureAdapter<Boolean> compareAndSetAsync(@NotNull String key, @Nullable Serializable expVal, @NotNull Serializable newVal) throws IgniteCheckedException {
        assert (newVal != null) : key;
        return this.startCas(key, DistributedMetaStorageUtil.marshal(this.marshaller, expVal), DistributedMetaStorageUtil.marshal(this.marshaller, newVal));
    }

    @Override
    public boolean compareAndRemove(@NotNull String key, @NotNull Serializable expVal) throws IgniteCheckedException {
        assert (expVal != null) : key;
        return this.startCas(key, DistributedMetaStorageUtil.marshal(this.marshaller, expVal), null).get();
    }

    @Override
    public void iterate(@NotNull String keyPrefix, @NotNull BiConsumer<String, ? super Serializable> cb) throws IgniteCheckedException {
        this.lock.readLock().lock();
        try {
            this.bridge.iterate(keyPrefix, cb);
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void listen(@NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr) {
        DistributedMetaStorageListener<?> lsnrUnchecked = lsnr;
        this.lsnrs.add(new IgniteBiTuple(keyPred, lsnrUnchecked));
    }

    @Override
    @Nullable
    public GridComponent.DiscoveryDataExchangeType discoveryDataType() {
        return GridComponent.DiscoveryDataExchangeType.META_STORAGE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
        this.lock.readLock().lock();
        if (this.isClient) {
            DistributedMetaStorageJoiningNodeData data = new DistributedMetaStorageJoiningNodeData(this.getBaselineTopologyId(), this.ver, DistributedMetaStorageHistoryItem.EMPTY_ARRAY);
            try {
                dataBag.addJoiningNodeData(COMPONENT_ID, (Serializable)this.marshaller.marshal(data));
                return;
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }
        DistributedMetaStorageJoiningNodeData data = new DistributedMetaStorageJoiningNodeData(this.getBaselineTopologyId(), this.ver, this.histCache.toArray());
        try {
            dataBag.addJoiningNodeData(COMPONENT_ID, (Serializable)this.marshaller.marshal(data));
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    private int getBaselineTopologyId() {
        BaselineTopology baselineTop = this.ctx.state().clusterState().baselineTopology();
        return baselineTop != null ? baselineTop.id() : -1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nullable
    public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData) {
        if (this.isClient) {
            return null;
        }
        this.lock.readLock().lock();
        try {
            Object errorMsg;
            DistributedMetaStorageVersion locVer = this.ver;
            if (!discoData.hasJoiningNodeData()) {
                if (ReadableDistributedMetaStorage.isSupported(this.ctx) && locVer.id > 0L && !node.isClient() && !node.isDaemon()) {
                    String errorMsg2 = "Node not supporting distributed metastorage feature is not allowed to join the cluster";
                    this.log.warning(errorMsg2);
                    IgniteNodeValidationResult igniteNodeValidationResult = new IgniteNodeValidationResult(node.id(), errorMsg2);
                    return igniteNodeValidationResult;
                }
                IgniteNodeValidationResult errorMsg2 = null;
                return errorMsg2;
            }
            DistributedMetaStorageJoiningNodeData joiningData = this.getJoiningNodeData(discoData);
            if (joiningData == null) {
                String errorMsg3 = "Cannot unmarshal joining node data";
                IgniteNodeValidationResult igniteNodeValidationResult = new IgniteNodeValidationResult(node.id(), errorMsg3);
                return igniteNodeValidationResult;
            }
            if (!this.isPersistenceEnabled) {
                IgniteNodeValidationResult errorMsg3 = null;
                return errorMsg3;
            }
            DistributedMetaStorageVersion remoteVer = joiningData.ver;
            DistributedMetaStorageHistoryItem[] remoteHist = joiningData.hist;
            int remoteHistSize = remoteHist.length;
            int remoteBltId = joiningData.bltId;
            boolean clusterIsActive = this.isActive();
            int locBltId = this.getBaselineTopologyId();
            int locHistSize = this.histCache.size();
            if (remoteVer.id < locVer.id - (long)locHistSize) {
                errorMsg = null;
            } else if (remoteVer.id < locVer.id) {
                DistributedMetaStorageVersion newRemoteVer = remoteVer.nextVersion(this::historyItem, remoteVer.id + 1L, locVer.id);
                errorMsg = newRemoteVer.equals(locVer) ? null : "Joining node has conflicting distributed metastorage data.";
            } else if (remoteVer.id == locVer.id) {
                errorMsg = remoteVer.equals(locVer) ? null : S.toString("Joining node has conflicting distributed metastorage data:", "clusterVersion", (Object)locVer, false, "joiningNodeVersion", (Object)remoteVer, false);
            } else if (remoteVer.id <= locVer.id + (long)remoteHistSize) {
                DistributedMetaStorageVersion newLocVer;
                errorMsg = clusterIsActive ? "Attempting to join node with larger distributed metastorage version id. The node is most likely in invalid state and can't be joined." : (remoteBltId < locBltId ? "Joining node has conflicting distributed metastorage data." : ((newLocVer = locVer.nextVersion(remoteHist, remoteHistSize - (int)(remoteVer.id - locVer.id), remoteHistSize)).equals(remoteVer) ? null : "Joining node has conflicting distributed metastorage data."));
            } else {
                assert (remoteVer.id > locVer.id + (long)remoteHistSize);
                errorMsg = clusterIsActive ? "Attempting to join node with larger distributed metastorage version id. The node is most likely in invalid state and can't be joined." : (remoteBltId < locBltId ? "Joining node has conflicting distributed metastorage data." : "Joining node doesn't have enough history items in distributed metastorage data. Please check the order in which you start cluster nodes.");
            }
            if (errorMsg == null) {
                errorMsg = this.validatePayload(joiningData);
            }
            IgniteNodeValidationResult igniteNodeValidationResult = errorMsg == null ? null : new IgniteNodeValidationResult(node.id(), (String)errorMsg);
            return igniteNodeValidationResult;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData) {
        for (DistributedMetaStorageHistoryItem item : joiningData.hist) {
            for (int i = 0; i < item.keys.length; ++i) {
                try {
                    DistributedMetaStorageUtil.unmarshal(this.marshaller, item.valBytesArray[i]);
                    continue;
                }
                catch (IgniteCheckedException e) {
                    return "Unable to unmarshal key=" + item.keys[i];
                }
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData discoData) {
        if (!discoData.hasJoiningNodeData()) {
            return;
        }
        DistributedMetaStorageJoiningNodeData joiningData = this.getJoiningNodeData(discoData);
        if (joiningData == null) {
            return;
        }
        DistributedMetaStorageVersion remoteVer = joiningData.ver;
        if (!ReadableDistributedMetaStorage.isSupported(this.ctx) && remoteVer.id > 0L) {
            return;
        }
        this.lock.writeLock().lock();
        try {
            DistributedMetaStorageVersion locVer = this.ver;
            if (remoteVer.id > locVer.id) {
                DistributedMetaStorageHistoryItem[] hist = joiningData.hist;
                if (remoteVer.id - locVer.id <= (long)hist.length) {
                    for (long v = locVer.id + 1L; v <= remoteVer.id; ++v) {
                        int hv = (int)(v - remoteVer.id + (long)hist.length - 1L);
                        try {
                            this.completeWrite(hist[hv]);
                            continue;
                        }
                        catch (IgniteCheckedException ex) {
                            this.log.error("Unable to unmarshal new metastore data. update=" + hist[hv], ex);
                        }
                    }
                } else assert (false) : "Joining node is too far ahead [remoteVer=" + remoteVer + "]";
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void collectGridNodeData(DiscoveryDataBag dataBag) {
        if (this.isClient) {
            return;
        }
        if (dataBag.commonDataCollectedFor(COMPONENT_ID)) {
            return;
        }
        DiscoveryDataBag.JoiningNodeDiscoveryData discoData = dataBag.newJoinerDiscoveryData(COMPONENT_ID);
        if (!discoData.hasJoiningNodeData()) {
            return;
        }
        if (!ReadableDistributedMetaStorage.isSupported(this.ctx)) {
            return;
        }
        DistributedMetaStorageJoiningNodeData joiningData = this.getJoiningNodeData(discoData);
        if (joiningData == null) {
            return;
        }
        DistributedMetaStorageVersion remoteVer = joiningData.ver;
        this.lock.readLock().lock();
        try {
            DistributedMetaStorageVersion locVer = this.ver;
            if (remoteVer.id >= locVer.id) {
                DistributedMetaStorageClusterNodeData nodeData = new DistributedMetaStorageClusterNodeData(remoteVer, null, null, null);
                dataBag.addGridCommonData(COMPONENT_ID, nodeData);
            } else if (locVer.id - remoteVer.id <= (long)this.histCache.size() && !dataBag.isJoiningNodeClient()) {
                DistributedMetaStorageHistoryItem[] updates = this.history(remoteVer.id + 1L, locVer.id);
                DistributedMetaStorageClusterNodeData nodeData = new DistributedMetaStorageClusterNodeData(this.ver, null, null, updates);
                dataBag.addGridCommonData(COMPONENT_ID, nodeData);
            } else {
                DistributedMetaStorageVersion ver0 = this.ver;
                DistributedMetaStorageKeyValuePair[] fullData = this.bridge.localFullData();
                DistributedMetaStorageHistoryItem[] hist = dataBag.isJoiningNodeClient() ? DistributedMetaStorageHistoryItem.EMPTY_ARRAY : this.history(this.ver.id - (long)this.histCache.size() + 1L, locVer.id);
                DistributedMetaStorageClusterNodeData nodeData = new DistributedMetaStorageClusterNodeData(ver0, fullData, hist, null);
                dataBag.addGridCommonData(COMPONENT_ID, nodeData);
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Nullable
    private DistributedMetaStorageJoiningNodeData getJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData discoData) {
        byte[] data = (byte[])discoData.joiningNodeData();
        assert (data != null);
        try {
            return (DistributedMetaStorageJoiningNodeData)this.marshaller.unmarshal(data, U.gridClassLoader());
        }
        catch (IgniteCheckedException e) {
            this.log.error("Unable to unmarshal joinging node data for distributed metastorage component.", e);
            return null;
        }
    }

    @Override
    public void onDisconnected(IgniteFuture<?> reconnectFut) {
        assert (this.isClient);
        this.lock.writeLock().lock();
        try {
            this.bridge = new InMemoryCachedDistributedMetaStorageBridge(this.marshaller);
            this.ver = DistributedMetaStorageVersion.INITIAL_VERSION;
            this.cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelUpdateFutures(Exception e, boolean stop) {
        this.updateFutsStopLock.writeLock().lock();
        try {
            this.stopped = stop;
            for (GridFutureAdapter fut : this.updateFuts.values()) {
                fut.onDone(e);
            }
            this.updateFuts.clear();
        }
        finally {
            this.updateFutsStopLock.writeLock().unlock();
        }
    }

    private static NodeStoppingException nodeStoppingException() {
        return new NodeStoppingException("Node is stopping.");
    }

    @Override
    public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) {
        assert (this.isClient);
        this.ctx.discovery().localJoinFuture().listen(this::notifyReadyForWrite);
        return null;
    }

    private DistributedMetaStorageHistoryItem historyItem(long specificVer) {
        return this.histCache.get(specificVer);
    }

    private DistributedMetaStorageHistoryItem[] history(long startVer, long actualVer) {
        return (DistributedMetaStorageHistoryItem[])LongStream.rangeClosed(startVer, actualVer).mapToObj(this::historyItem).toArray(DistributedMetaStorageHistoryItem[]::new);
    }

    private DistributedMetaStorageKeyValuePair[] localFullData() {
        return this.bridge.localFullData();
    }

    @Override
    public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
        this.lock.writeLock().lock();
        try {
            DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData();
            if (nodeData != null) {
                if (nodeData.fullData != null) {
                    this.ver = nodeData.ver;
                    this.notifyListenersBeforeReadyForWrite(nodeData.fullData);
                    this.bridge.writeFullNodeData(nodeData);
                }
                if (nodeData.hist != null) {
                    this.clearHistoryCache();
                    int len = nodeData.hist.length;
                    for (int i = 0; i < len; ++i) {
                        DistributedMetaStorageHistoryItem histItem = nodeData.hist[i];
                        this.addToHistoryCache(this.ver.id + (long)i - (long)(len - 1), histItem);
                    }
                }
                if (this.isPersistenceEnabled && nodeData.fullData != null) {
                    this.worker.update(nodeData);
                }
                if (nodeData.updates != null) {
                    for (DistributedMetaStorageHistoryItem update : nodeData.updates) {
                        this.completeWrite(update);
                    }
                }
            } else if (!this.isClient && this.ver.id > 0L) {
                throw new IgniteException("Cannot join the cluster because it doesn't support distributed metastorage feature and this node has not empty distributed metastorage data");
            }
        }
        catch (IgniteCheckedException ex) {
            throw new IgniteException("Cannot join the cluster", ex);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void checkMaxKeyLengthExceeded(String key) throws IgniteCheckedException {
        if (DistributedMetaStorage.longKeysSupported(this.ctx)) {
            return;
        }
        if (DistributedMetaStorageUtil.localKey(key).getBytes().length > 64) {
            throw new IgniteCheckedException("Key is too long. Maximum key length is " + (64 - DistributedMetaStorageUtil.localKeyPrefix().getBytes().length) + " bytes in UTF8");
        }
    }

    private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
        UUID reqId = UUID.randomUUID();
        GridFutureAdapter<Boolean> fut = this.prepareWriteFuture(key, reqId);
        if (fut.isDone()) {
            return fut;
        }
        DistributedMetaStorageUpdateMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
        this.ctx.discovery().sendCustomEvent(msg);
        return fut;
    }

    private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes) throws IgniteCheckedException {
        UUID reqId = UUID.randomUUID();
        GridFutureAdapter<Boolean> fut = this.prepareWriteFuture(key, reqId);
        if (fut.isDone()) {
            return fut;
        }
        DistributedMetaStorageCasMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
        this.ctx.discovery().sendCustomEvent(msg);
        return fut;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private GridFutureAdapter<Boolean> prepareWriteFuture(String key, UUID reqId) throws IgniteCheckedException {
        boolean supported;
        try {
            supported = ReadableDistributedMetaStorage.isSupported(this.ctx);
        }
        catch (Exception e) {
            if (X.hasCause((Throwable)e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) {
                GridFutureAdapter<Boolean> fut = new GridFutureAdapter<Boolean>();
                fut.onDone(DistributedMetaStorageImpl.nodeStoppingException());
                return fut;
            }
            throw e;
        }
        if (!supported) {
            throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
        }
        this.checkMaxKeyLengthExceeded(key);
        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<Boolean>();
        this.updateFutsStopLock.readLock().lock();
        try {
            if (this.stopped) {
                fut.onDone(DistributedMetaStorageImpl.nodeStoppingException());
                GridFutureAdapter<Boolean> gridFutureAdapter = fut;
                return gridFutureAdapter;
            }
            this.updateFuts.put(reqId, fut);
        }
        finally {
            this.updateFutsStopLock.readLock().unlock();
        }
        return fut;
    }

    private void onUpdateMessage(AffinityTopologyVersion topVer, ClusterNode node, DistributedMetaStorageUpdateMessage msg) {
        if (msg.errorMessage() != null) {
            return;
        }
        if (!ReadableDistributedMetaStorage.isSupported(this.ctx)) {
            msg.errorMessage(NOT_SUPPORTED_MSG);
            return;
        }
        try {
            if (msg instanceof DistributedMetaStorageCasMessage) {
                this.completeCas((DistributedMetaStorageCasMessage)msg);
            } else {
                this.completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
            }
        }
        catch (IgniteInterruptedCheckedException e) {
            throw U.convertException(e);
        }
        catch (Error | IgniteCheckedException e) {
            throw this.criticalError(e);
        }
    }

    private void onAckMessage(AffinityTopologyVersion topVer, ClusterNode node, DistributedMetaStorageUpdateAckMessage msg) {
        GridFutureAdapter fut = (GridFutureAdapter)this.updateFuts.remove(msg.requestId());
        if (fut != null) {
            String errorMsg = msg.errorMessage();
            if (errorMsg == null) {
                Boolean res = msg instanceof DistributedMetaStorageCasAckMessage ? Boolean.valueOf(((DistributedMetaStorageCasAckMessage)msg).updated()) : null;
                fut.onDone(res);
            } else {
                fut.onDone(new IllegalStateException(errorMsg));
            }
        }
    }

    private RuntimeException criticalError(Throwable e) {
        this.ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
        if (e instanceof Error) {
            throw (Error)e;
        }
        throw U.convertException((IgniteCheckedException)e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeWrite(DistributedMetaStorageHistoryItem histItem) throws IgniteCheckedException {
        this.lock.writeLock().lock();
        try {
            int i;
            histItem = this.optimizeHistoryItem(histItem);
            if (histItem == null) {
                return;
            }
            this.ver = this.ver.nextVersion(histItem);
            int len = histItem.keys.length;
            for (i = 0; i < len; ++i) {
                String key = histItem.keys[i];
                byte[] valBytes = histItem.valBytesArray[i];
                this.notifyListeners(histItem.keys[i], () -> this.bridge.read(key), () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, valBytes));
            }
            len = histItem.keys.length;
            for (i = 0; i < len; ++i) {
                this.bridge.write(histItem.keys[i], histItem.valBytesArray[i]);
            }
            this.addToHistoryCache(this.ver.id, histItem);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (this.isPersistenceEnabled) {
            this.worker.update(histItem);
        }
        this.shrinkHistory();
    }

    @Nullable
    private DistributedMetaStorageHistoryItem optimizeHistoryItem(DistributedMetaStorageHistoryItem histItem) {
        String[] keys = histItem.keys;
        byte[][] valBytesArr = histItem.valBytesArray;
        int len = keys.length;
        int cnt = 0;
        BitSet matches = new BitSet(len);
        for (int i = 0; i < len; ++i) {
            byte[] valBytes = valBytesArr[i];
            String key = keys[i];
            byte[] existingValBytes = this.bridge.readMarshalled(key);
            if (Arrays.equals(valBytes, existingValBytes)) {
                matches.set(i);
                continue;
            }
            ++cnt;
        }
        if (cnt == 0) {
            return null;
        }
        if (cnt != len) {
            String[] newKeys = new String[cnt];
            byte[][] newValBytesArr = new byte[cnt][];
            int dst = 0;
            for (int src = 0; src < len; ++src) {
                if (matches.get(src)) continue;
                newKeys[dst] = keys[src];
                newValBytesArr[dst] = valBytesArr[src];
                ++dst;
            }
            return new DistributedMetaStorageHistoryItem(newKeys, newValBytesArr);
        }
        return histItem;
    }

    private void completeCas(DistributedMetaStorageCasMessage msg) throws IgniteCheckedException {
        Serializable expVal;
        if (!msg.matches()) {
            return;
        }
        Serializable oldVal = this.bridge.read(msg.key());
        if (!Objects.deepEquals(oldVal, expVal = DistributedMetaStorageUtil.unmarshal(this.marshaller, msg.expectedValue()))) {
            msg.setMatches(false);
            return;
        }
        this.completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
    }

    void addToHistoryCache(long ver, DistributedMetaStorageHistoryItem histItem) {
        assert (this.lock.isWriteLockedByCurrentThread());
        if (!this.isClient) {
            this.histCache.put(ver, histItem);
        }
    }

    void clearHistoryCache() {
        assert (this.lock.isWriteLockedByCurrentThread());
        this.histCache.clear();
    }

    private void shrinkHistory() {
        this.lock.writeLock().lock();
        try {
            while (this.histCache.sizeInBytes() > this.histMaxBytes && this.histCache.size() > 1) {
                this.histCache.removeOldest();
                if (!this.isPersistenceEnabled) continue;
                this.worker.removeHistItem(this.ver.id - (long)this.histCache.size());
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void notifyListenersBeforeReadyForWrite(DistributedMetaStorageKeyValuePair[] newData) throws IgniteCheckedException {
        assert (this.lock.isWriteLockedByCurrentThread());
        DistributedMetaStorageKeyValuePair[] oldData = this.bridge.localFullData();
        int oldIdx = 0;
        int newIdx = 0;
        while (oldIdx < oldData.length && newIdx < newData.length) {
            String oldKey = oldData[oldIdx].key;
            byte[] oldValBytes = oldData[oldIdx].valBytes;
            String newKey = newData[newIdx].key;
            byte[] newValBytes = newData[newIdx].valBytes;
            int c = oldKey.compareTo(newKey);
            if (c < 0) {
                this.notifyListeners(oldKey, () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, oldValBytes), () -> null);
                ++oldIdx;
                continue;
            }
            if (c > 0) {
                this.notifyListeners(newKey, () -> null, () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, newValBytes));
                ++newIdx;
                continue;
            }
            this.notifyListeners(oldKey, () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, oldValBytes), () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, newValBytes));
            ++oldIdx;
            ++newIdx;
        }
        while (oldIdx < oldData.length) {
            byte[] oldValBytes = oldData[oldIdx].valBytes;
            this.notifyListeners(oldData[oldIdx].key, () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, oldValBytes), () -> null);
            ++oldIdx;
        }
        while (newIdx < newData.length) {
            byte[] newValBytes = newData[newIdx].valBytes;
            this.notifyListeners(newData[newIdx].key, () -> null, () -> DistributedMetaStorageUtil.unmarshal(this.marshaller, newValBytes));
            ++newIdx;
        }
    }

    private void notifyListeners(String key, @NotNull IgniteProducer<Serializable> oldValProducer, @NotNull IgniteProducer<Serializable> newValProducer) throws IgniteCheckedException {
        boolean valuesProduced = false;
        Serializable newVal = null;
        Serializable oldVal = null;
        for (IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>> entry : this.lsnrs) {
            if (!entry.get1().test(key)) continue;
            if (!valuesProduced) {
                newVal = newValProducer.produce();
                oldVal = oldValProducer.produce();
                valuesProduced = true;
            }
            try {
                entry.get2().onUpdate(key, oldVal, newVal);
            }
            catch (Exception e) {
                this.log.error(S.toString("Failed to notify distributed metastorage update listener", "key", (Object)key, false, "oldVal", (Object)oldVal, false, "newVal", (Object)newVal, false, "lsnr", entry.get2(), false), e);
            }
        }
    }

    private void localMetastorageLock() {
        this.ctx.cache().context().database().checkpointReadLock();
    }

    private void localMetastorageUnlock() {
        this.ctx.cache().context().database().checkpointReadUnlock();
    }
}

