package org.apache.qpid.server.protocol.v1_0;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.util.Action;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/protocol/v1_0/LinkImpl.class */
public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Link_1_0<S, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
    private final String _remoteContainerId;
    private final String _linkName;
    private final Role _role;
    private final LinkRegistry<S, T> _linkRegistry;
    private final Queue<LinkImpl<S, T>.ThiefInformation> _thiefQueue;
    private volatile LinkEndpoint<S, T> _linkEndpoint;
    private volatile S _source;
    private volatile T _target;
    private boolean _stealingInProgress;
    private final Queue<Action<? super Link_1_0<S, T>>> _deleteTasks;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/protocol/v1_0/LinkImpl$ThiefInformation.class */
    public class ThiefInformation {
        private final Session_1_0 _session;
        private final Attach _attach;
        private final SettableFuture<LinkEndpoint<S, T>> _future;

        ThiefInformation(Session_1_0 session_1_0, Attach attach, SettableFuture<LinkEndpoint<S, T>> settableFuture) {
            this._session = session_1_0;
            this._attach = attach;
            this._future = settableFuture;
        }

        Session_1_0 getSession() {
            return this._session;
        }

        Attach getAttach() {
            return this._attach;
        }

        SettableFuture<LinkEndpoint<S, T>> getFuture() {
            return this._future;
        }
    }

    public LinkImpl(String str, String str2, Role role, LinkRegistry<S, T> linkRegistry) {
        this._thiefQueue = new LinkedList();
        this._deleteTasks = new ConcurrentLinkedQueue();
        this._remoteContainerId = str;
        this._linkName = str2;
        this._role = role;
        this._linkRegistry = linkRegistry;
    }

    public LinkImpl(LinkDefinition<S, T> linkDefinition, LinkRegistry<S, T> linkRegistry) {
        this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), linkDefinition.getRole(), linkRegistry);
        setTermini(linkDefinition.getSource(), linkDefinition.getTarget());
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public final synchronized ListenableFuture<? extends LinkEndpoint<S, T>> attach(Session_1_0 session_1_0, Attach attach) {
        try {
            if (this._role == attach.getRole()) {
                throw new AmqpErrorException(new Error(AmqpError.ILLEGAL_STATE, "Cannot switch SendingLink to ReceivingLink and vice versa"));
            }
            if (this._linkEndpoint == null || session_1_0.equals(this._linkEndpoint.getSession())) {
                if (this._linkEndpoint == null) {
                    this._linkEndpoint = createLinkEndpoint(session_1_0, attach);
                }
                this._linkEndpoint.receiveAttach(attach);
                this._linkRegistry.linkChanged(this);
                return Futures.immediateFuture(this._linkEndpoint);
            }
            if (!Objects.equals(this._linkEndpoint.getSession().getConnection().getPrincipal(), session_1_0.getConnection().getPrincipal())) {
                (this._linkEndpoint instanceof SendingLinkEndpoint ? (ConfiguredObject) ((SendingLinkEndpoint) this._linkEndpoint).getDestination().getMessageSource() : session_1_0.getReceivingDestination(this, (Target) getTarget()).getMessageDestination()).authorise(attach.getRole() == Role.SENDER ? Operation.PERFORM_ACTION("publish") : Operation.PERFORM_ACTION("consume"));
            }
            SettableFuture create = SettableFuture.create();
            this._thiefQueue.add(new ThiefInformation(session_1_0, attach, create));
            startLinkStealingIfNecessary();
            return create;
        } catch (Exception e) {
            LOGGER.debug("Error attaching link", e);
            return rejectLink(session_1_0, e);
        }
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public synchronized void linkClosed() {
        Iterator<Action<? super Link_1_0<S, T>>> it = this._deleteTasks.iterator();
        while (it.hasNext()) {
            it.next().performAction(this);
            it.remove();
        }
        discardEndpoint();
        this._linkRegistry.linkClosed(this);
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public synchronized void discardEndpoint() {
        this._linkEndpoint = null;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkDefinition
    public final String getName() {
        return this._linkName;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkDefinition
    public Role getRole() {
        return this._role;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkDefinition
    public S getSource() {
        return this._source;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public void setSource(S s) {
        setTermini(s, this._target);
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkDefinition
    public T getTarget() {
        return this._target;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public void setTarget(T t) {
        setTermini(this._source, t);
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public void setTermini(S s, T t) {
        this._source = s;
        this._target = t;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.Link_1_0
    public TerminusDurability getHighestSupportedTerminusDurability() {
        return this._linkRegistry.getHighestSupportedTerminusDurability();
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkDefinition
    public String getRemoteContainerId() {
        return this._remoteContainerId;
    }

    private LinkEndpoint<S, T> createLinkEndpoint(Session_1_0 session_1_0, Attach attach) {
        return this._role == Role.SENDER ? new SendingLinkEndpoint(session_1_0, this) : attach.getTarget() instanceof Coordinator ? new TxnCoordinatorReceivingLinkEndpoint(session_1_0, this) : new StandardReceivingLinkEndpoint(session_1_0, this);
    }

    private ListenableFuture<? extends LinkEndpoint<S, T>> rejectLink(Session_1_0 session_1_0, Throwable th) {
        if (th instanceof AmqpErrorException) {
            this._linkEndpoint = new ErrantLinkEndpoint(this, session_1_0, ((AmqpErrorException) th).getError());
        } else {
            this._linkEndpoint = new ErrantLinkEndpoint(this, session_1_0, new Error(AmqpError.INTERNAL_ERROR, th.getMessage()));
        }
        return Futures.immediateFuture(this._linkEndpoint);
    }

    private void startLinkStealingIfNecessary() {
        if (this._stealingInProgress) {
            return;
        }
        this._stealingInProgress = true;
        stealLink();
    }

    private synchronized void stealLink() {
        final LinkImpl<S, T>.ThiefInformation poll = this._thiefQueue.poll();
        if (poll != null) {
            AbstractConfiguredObject.addFutureCallback(doStealLink(poll.getSession(), poll.getAttach()), new FutureCallback<LinkEndpoint<S, T>>() { // from class: org.apache.qpid.server.protocol.v1_0.LinkImpl.1
                public void onSuccess(LinkEndpoint<S, T> linkEndpoint) {
                    poll.getFuture().set(linkEndpoint);
                    LinkImpl.this.stealLink();
                }

                public void onFailure(Throwable th) {
                    poll.getFuture().setException(th);
                    LinkImpl.this.stealLink();
                }
            }, MoreExecutors.directExecutor());
        } else {
            this._stealingInProgress = false;
        }
    }

    private ListenableFuture<LinkEndpoint<S, T>> doStealLink(Session_1_0 session_1_0, Attach attach) {
        SettableFuture<LinkEndpoint<S, T>> create = SettableFuture.create();
        LinkEndpoint<S, T> linkEndpoint = this._linkEndpoint;
        if (linkEndpoint != null) {
            linkEndpoint.getSession().doOnIOThreadAsync(() -> {
                LinkEndpoint<S, T> linkEndpoint2 = this._linkEndpoint;
                if (linkEndpoint2 != null) {
                    linkEndpoint2.close(new Error(LinkError.STOLEN, String.format("Link is being stolen by connection '%s'", session_1_0.getConnection())));
                }
                doLinkStealAndHandleExceptions(session_1_0, attach, create);
            });
        } else {
            doLinkStealAndHandleExceptions(session_1_0, attach, create);
        }
        return create;
    }

    private void doLinkStealAndHandleExceptions(Session_1_0 session_1_0, Attach attach, SettableFuture<LinkEndpoint<S, T>> settableFuture) {
        try {
            settableFuture.set((LinkEndpoint) attach(session_1_0, attach).get());
        } catch (InterruptedException e) {
            settableFuture.setException(e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            settableFuture.setException(e2.getCause());
        }
    }

    public void addDeleteTask(Action<? super LinkModel> action) {
        this._deleteTasks.add(action);
    }

    public void removeDeleteTask(Action<? super LinkModel> action) {
        this._deleteTasks.remove(action);
    }
}
