/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.commandhandling.distributed.jgroups;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.CommandBusConnectorCommunicationException;
import org.axonframework.commandhandling.distributed.CommandCallbackRepository;
import org.axonframework.commandhandling.distributed.CommandCallbackWrapper;
import org.axonframework.commandhandling.distributed.CommandRouter;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.Member;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.commandhandling.distributed.ServiceRegistryException;
import org.axonframework.commandhandling.distributed.SimpleMember;
import org.axonframework.commandhandling.distributed.commandfilter.DenyAll;
import org.axonframework.commandhandling.distributed.jgroups.ConnectionFailedException;
import org.axonframework.commandhandling.distributed.jgroups.DispatchMessage;
import org.axonframework.commandhandling.distributed.jgroups.JoinMessage;
import org.axonframework.commandhandling.distributed.jgroups.MembershipUpdateFailedException;
import org.axonframework.commandhandling.distributed.jgroups.ReplyMessage;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.serialization.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JGroupsConnector
implements CommandRouter,
Receiver,
CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(JGroupsConnector.class);
    private final CommandBus localSegment;
    private final CommandCallbackRepository<Address> callbackRepository = new CommandCallbackRepository();
    private final Serializer serializer;
    private final JoinCondition joinedCondition = new JoinCondition();
    private final Map<Address, SimpleMember<Address>> members = new HashMap<Address, SimpleMember<Address>>();
    private final String clusterName;
    private final RoutingStrategy routingStrategy;
    private final JChannel channel;
    private final AtomicReference<ConsistentHash> consistentHash = new AtomicReference<ConsistentHash>(new ConsistentHash());
    private volatile View currentView;
    private volatile int loadFactor = 0;
    private volatile Predicate<CommandMessage<?>> commandFilter = DenyAll.INSTANCE;

    public JGroupsConnector(CommandBus localSegment, JChannel channel, String clusterName, Serializer serializer) {
        this(localSegment, channel, clusterName, serializer, new AnnotationRoutingStrategy());
    }

    public JGroupsConnector(CommandBus localSegment, JChannel channel, String clusterName, Serializer serializer, RoutingStrategy routingStrategy) {
        this.localSegment = localSegment;
        this.serializer = serializer;
        this.channel = channel;
        this.clusterName = clusterName;
        this.routingStrategy = routingStrategy;
    }

    @Override
    public void updateMembership(int loadFactor, Predicate<CommandMessage<?>> commandFilter) {
        this.loadFactor = loadFactor;
        this.commandFilter = commandFilter;
        this.broadCastMembership();
    }

    protected void broadCastMembership() throws ServiceRegistryException {
        try {
            if (this.channel.isConnected()) {
                Address localAddress = this.channel.getAddress();
                this.channel.send(null, (Object)new JoinMessage(localAddress, this.loadFactor, this.commandFilter));
            }
        }
        catch (Exception e) {
            throw new ServiceRegistryException("Could not broadcast local membership details to the cluster", e);
        }
    }

    public void connect() throws Exception {
        if (this.channel.getClusterName() != null && !this.clusterName.equals(this.channel.getClusterName())) {
            throw new ConnectionFailedException("Already joined cluster: " + this.channel.getClusterName());
        }
        this.channel.setReceiver((Receiver)this);
        this.channel.connect(this.clusterName);
        this.broadCastMembership();
        Address localAddress = this.channel.getAddress();
        String localName = this.channel.getName(localAddress);
        SimpleMember<Address> localMember = new SimpleMember<Address>(localName, localAddress, null);
        this.members.put(localAddress, localMember);
        this.consistentHash.updateAndGet(ch -> ch.with(localMember, this.loadFactor, this.commandFilter));
    }

    public void disconnect() {
        this.channel.disconnect();
    }

    public void getState(OutputStream ostream) throws Exception {
    }

    public void setState(InputStream istream) throws Exception {
    }

    public void viewAccepted(View view) {
        if (this.currentView == null) {
            logger.info("Local segment ({}) joined the cluster. Broadcasting configuration.", (Object)this.channel.getAddress());
            try {
                this.broadCastMembership();
                this.joinedCondition.markJoined(true);
            }
            catch (Exception e) {
                throw new MembershipUpdateFailedException("Failed to broadcast my settings", e);
            }
        } else if (!view.equals((Object)this.currentView)) {
            Address[][] diff = View.diff((View)this.currentView, (View)view);
            Address[] joined = diff[0];
            Address[] left = diff[1];
            Arrays.stream(joined).filter(member -> !member.equals(this.channel.getAddress())).forEach(member -> {
                logger.info("New member detected: [{}]. Sending it my configuration.", member);
                try {
                    this.channel.send(member, (Object)new JoinMessage(this.channel.getAddress(), this.loadFactor, this.commandFilter));
                }
                catch (Exception e) {
                    throw new MembershipUpdateFailedException("Failed to notify my existence to " + member);
                }
            });
            Arrays.stream(left).forEach(lm -> this.consistentHash.updateAndGet(ch -> {
                SimpleMember<Address> member = this.members.get(lm);
                if (member == null) {
                    return ch;
                }
                return ch.without(member);
            }));
            Arrays.stream(left).forEach(this.members::remove);
        }
        this.currentView = view;
    }

    public void suspect(Address suspected_mbr) {
        logger.warn("Member is suspect: {}", (Object)suspected_mbr.toString());
    }

    public void block() {
    }

    public void unblock() {
    }

    public void receive(Message msg) {
        Object message = msg.getObject();
        if (message instanceof JoinMessage) {
            this.processJoinMessage(msg, (JoinMessage)message);
        } else if (message instanceof DispatchMessage) {
            this.processDispatchMessage(msg, (DispatchMessage)message);
        } else if (message instanceof ReplyMessage) {
            this.processReplyMessage((ReplyMessage)message);
        }
    }

    private void processReplyMessage(ReplyMessage message) {
        CommandCallbackWrapper callbackWrapper = this.callbackRepository.fetchAndRemove(message.getCommandIdentifier());
        if (callbackWrapper == null) {
            logger.warn("Received a callback for a message that has either already received a callback, or which was not sent through this node. Ignoring.");
        } else if (message.isSuccess()) {
            callbackWrapper.success(message.getReturnValue(this.serializer));
        } else {
            callbackWrapper.fail(message.getError(this.serializer));
        }
    }

    private <C, R> void processDispatchMessage(final Message msg, final DispatchMessage message) {
        if (message.isExpectReply()) {
            try {
                CommandMessage<?> commandMessage = message.getCommandMessage(this.serializer);
                this.localSegment.dispatch(commandMessage, new CommandCallback<C, R>(){

                    public void onSuccess(CommandMessage<? extends C> commandMessage, R result) {
                        JGroupsConnector.this.sendReply(msg.getSrc(), message.getCommandIdentifier(), result, null);
                    }

                    public void onFailure(CommandMessage<? extends C> commandMessage, Throwable cause) {
                        JGroupsConnector.this.sendReply(msg.getSrc(), message.getCommandIdentifier(), null, cause);
                    }
                });
            }
            catch (Exception e) {
                this.sendReply(msg.getSrc(), message.getCommandIdentifier(), null, e);
            }
        } else {
            try {
                this.localSegment.dispatch(message.getCommandMessage(this.serializer));
            }
            catch (Exception e) {
                logger.error("Could not dispatch command", (Throwable)e);
            }
        }
    }

    private <R> void sendReply(Address address, String commandIdentifier, R result, Throwable cause) {
        try {
            this.channel.send(address, (Object)new ReplyMessage(commandIdentifier, result, cause, this.serializer));
        }
        catch (Exception e) {
            try {
                this.channel.send(address, (Object)new ReplyMessage(commandIdentifier, null, e, this.serializer));
            }
            catch (Exception e1) {
                logger.error("Could not send reply", (Throwable)e1);
            }
        }
    }

    private void processJoinMessage(Message message, JoinMessage joinMessage) {
        String joinedMember = this.channel.getName(message.getSrc());
        if (joinedMember != null) {
            int loadFactor = joinMessage.getLoadFactor();
            Predicate<CommandMessage<?>> commandFilter = joinMessage.messageFilter();
            SimpleMember<Address> member = new SimpleMember<Address>(joinedMember, message.getSrc(), null);
            this.members.put(member.endpoint(), member);
            this.consistentHash.updateAndGet(ch -> ch.with(member, loadFactor, commandFilter));
            if (logger.isInfoEnabled() && !message.getSrc().equals(this.channel.getAddress())) {
                logger.info("{} joined with load factor: {}", (Object)joinedMember, (Object)loadFactor);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Got a network of members: {}", this.members.values());
            }
        } else {
            logger.warn("Received join message from '{}', but a connection with the sender has been lost.", (Object)message.getSrc().toString());
        }
    }

    public boolean awaitJoined() throws InterruptedException {
        this.joinedCondition.await();
        return this.joinedCondition.isJoined();
    }

    public boolean awaitJoined(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.joinedCondition.await(timeout, timeUnit);
        return this.joinedCondition.isJoined();
    }

    public String getNodeName() {
        return this.channel.getName();
    }

    protected ConsistentHash getConsistentHash() {
        return this.consistentHash.get();
    }

    @Override
    public <C> void send(Member destination, CommandMessage<? extends C> command) throws Exception {
        this.channel.send(this.resolveAddress(destination), (Object)new DispatchMessage(command, this.serializer, false));
    }

    @Override
    public <C, R> void send(Member destination, CommandMessage<C> command, CommandCallback<? super C, R> callback) throws Exception {
        this.callbackRepository.store(command.getIdentifier(), new CommandCallbackWrapper<Member, C, R>(destination, command, callback));
        this.channel.send(this.resolveAddress(destination), (Object)new DispatchMessage(command, this.serializer, true));
    }

    @Override
    public Registration subscribe(String commandName, MessageHandler<? super CommandMessage<?>> handler) {
        return this.localSegment.subscribe(commandName, handler);
    }

    protected Address resolveAddress(Member destination) {
        return destination.getConnectionEndpoint(Address.class).orElseThrow(() -> new CommandBusConnectorCommunicationException("The target member doesn't expose a JGroups endpoint"));
    }

    @Override
    public Optional<Member> findDestination(CommandMessage<?> message) {
        String routingKey = this.routingStrategy.getRoutingKey(message);
        return this.consistentHash.get().getMember(routingKey, message);
    }

    private static final class JoinCondition {
        private final CountDownLatch joinCountDown = new CountDownLatch(1);
        private volatile boolean success;

        private JoinCondition() {
        }

        public void await() throws InterruptedException {
            this.joinCountDown.await();
        }

        public void await(long timeout, TimeUnit timeUnit) throws InterruptedException {
            this.joinCountDown.await(timeout, timeUnit);
        }

        private void markJoined(boolean joinSucceeded) {
            this.success = joinSucceeded;
            this.joinCountDown.countDown();
        }

        public boolean isJoined() {
            return this.success;
        }
    }
}

