package com.github.msemys.esjc.subscription;

import com.github.msemys.esjc.ConnectionClosedException;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.Subscription;
import com.github.msemys.esjc.SubscriptionDropReason;
import com.github.msemys.esjc.SubscriptionListener;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.operation.AccessDeniedException;
import com.github.msemys.esjc.operation.CommandNotExpectedException;
import com.github.msemys.esjc.operation.InspectionDecision;
import com.github.msemys.esjc.operation.InspectionResult;
import com.github.msemys.esjc.operation.NotAuthenticatedException;
import com.github.msemys.esjc.operation.ServerErrorException;
import com.github.msemys.esjc.proto.EventStoreClientMessages;
import com.github.msemys.esjc.tcp.TcpCommand;
import com.github.msemys.esjc.tcp.TcpFlag;
import com.github.msemys.esjc.tcp.TcpPackage;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Throwables;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import io.netty.channel.Channel;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/msemys/esjc/subscription/AbstractSubscriptionOperation.class */
public abstract class AbstractSubscriptionOperation<T extends Subscription, E extends ResolvedEvent> implements SubscriptionOperation {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSubscriptionOperation.class);
    private static final int MAX_QUEUE_SIZE = 2000;
    private final CompletableFuture<Subscription> result;
    private final TcpCommand subscribeCommand;
    protected final String streamId;
    protected final boolean resolveLinkTos;
    protected final UserCredentials userCredentials;
    protected final SubscriptionListener<T, E> listener;
    protected final Supplier<Channel> connectionSupplier;
    private final Executor executor;
    private T subscription;
    protected UUID correlationId;
    private final Queue<Runnable> actionQueue = new ConcurrentLinkedQueue();
    private final AtomicBoolean actionExecuting = new AtomicBoolean();
    private final AtomicBoolean unsubscribed = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionOperation(CompletableFuture<Subscription> completableFuture, TcpCommand tcpCommand, String str, boolean z, UserCredentials userCredentials, SubscriptionListener<T, E> subscriptionListener, Supplier<Channel> supplier, Executor executor) {
        Preconditions.checkNotNull(completableFuture, "result is null");
        Preconditions.checkNotNull(tcpCommand, "subscribeCommand is null");
        Preconditions.checkNotNull(subscriptionListener, "listener is null");
        Preconditions.checkNotNull(supplier, "connectionSupplier is null");
        Preconditions.checkNotNull(executor, "executor is null");
        this.result = completableFuture;
        this.subscribeCommand = tcpCommand;
        this.streamId = str;
        this.resolveLinkTos = z;
        this.userCredentials = userCredentials;
        this.listener = subscriptionListener;
        this.connectionSupplier = supplier;
        this.executor = executor;
    }

    protected abstract MessageLite createSubscribeMessage();

    protected abstract T createSubscription(long j, Long l);

    protected abstract boolean inspect(TcpPackage tcpPackage, InspectionResult.Builder builder);

    @Override // com.github.msemys.esjc.subscription.SubscriptionOperation
    public boolean subscribe(UUID uuid, Channel channel) {
        Preconditions.checkNotNull(channel, "connection is null");
        if (this.subscription != null || this.unsubscribed.get()) {
            return false;
        }
        this.correlationId = uuid;
        channel.writeAndFlush(TcpPackage.newBuilder().command(this.subscribeCommand).flag(this.userCredentials != null ? TcpFlag.Authenticated : TcpFlag.None).correlationId(uuid).login(this.userCredentials != null ? this.userCredentials.username : null).password(this.userCredentials != null ? this.userCredentials.password : null).data(createSubscribeMessage().toByteArray()).build());
        return true;
    }

    @Override // com.github.msemys.esjc.subscription.SubscriptionOperation
    public void drop(SubscriptionDropReason subscriptionDropReason, Exception exc, Channel channel) {
        if (this.unsubscribed.compareAndSet(false, true)) {
            logger.trace("Subscription {} to {}: closing subscription, reason: {}", new Object[]{this.correlationId, streamId(), subscriptionDropReason, exc});
            if (subscriptionDropReason != SubscriptionDropReason.UserInitiated) {
                Preconditions.checkNotNull(exc, "No exception provided for subscription drop reason '%s", subscriptionDropReason);
                this.result.completeExceptionally(exc);
            }
            if (subscriptionDropReason == SubscriptionDropReason.UserInitiated && this.subscription != null && channel != null) {
                channel.writeAndFlush(TcpPackage.newBuilder().command(TcpCommand.UnsubscribeFromStream).correlationId(this.correlationId).data(EventStoreClientMessages.UnsubscribeFromStream.getDefaultInstance().toByteArray()).build());
            }
            if (this.subscription != null) {
                action(() -> {
                    this.listener.onClose(this.subscription, subscriptionDropReason, exc);
                });
            }
        }
    }

    @Override // com.github.msemys.esjc.subscription.SubscriptionOperation
    public InspectionResult inspect(TcpPackage tcpPackage) {
        try {
            InspectionResult.Builder newBuilder = InspectionResult.newBuilder();
            if (inspect(tcpPackage, newBuilder)) {
                return newBuilder.build();
            }
            switch (tcpPackage.command) {
                case SubscriptionDropped:
                    EventStoreClientMessages.SubscriptionDropped newInstance = newInstance(EventStoreClientMessages.SubscriptionDropped.getDefaultInstance(), tcpPackage.data);
                    switch (newInstance.getReason()) {
                        case Unsubscribed:
                            drop(SubscriptionDropReason.UserInitiated, null);
                            break;
                        case AccessDenied:
                            drop(SubscriptionDropReason.AccessDenied, new AccessDeniedException(String.format("Subscription to '%s' failed due to access denied.", streamId())));
                            break;
                        case NotFound:
                            drop(SubscriptionDropReason.NotFound, new IllegalArgumentException(String.format("Subscription to '%s' failed due to not found.", streamId())));
                            break;
                        default:
                            logger.trace("Subscription dropped by server. Reason: {}.", newInstance.getReason());
                            drop(SubscriptionDropReason.Unknown, new CommandNotExpectedException(String.format("Unsubscribe reason: '%s'.", newInstance.getReason())));
                            break;
                    }
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description(String.format("SubscriptionDropped: %s", newInstance.getReason())).build();
                case NotAuthenticated:
                    drop(SubscriptionDropReason.NotAuthenticated, new NotAuthenticatedException(Strings.defaultIfEmpty(Strings.newString(tcpPackage.data), "Authentication error")));
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description("NotAuthenticated").build();
                case BadRequest:
                    drop(SubscriptionDropReason.ServerError, new ServerErrorException(Strings.defaultIfEmpty(Strings.newString(tcpPackage.data), "<no message>")));
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description("BadRequest: " + Strings.newString(tcpPackage.data)).build();
                case NotHandled:
                    Preconditions.checkState(this.subscription == null, "NotHandled command appeared while we were already subscribed.");
                    EventStoreClientMessages.NotHandled newInstance2 = newInstance(EventStoreClientMessages.NotHandled.getDefaultInstance(), tcpPackage.data);
                    switch (newInstance2.getReason()) {
                        case NotReady:
                            return InspectionResult.newBuilder().decision(InspectionDecision.Retry).description("NotHandled - NotReady").build();
                        case TooBusy:
                            return InspectionResult.newBuilder().decision(InspectionDecision.Retry).description("NotHandled - TooBusy").build();
                        case NotMaster:
                            EventStoreClientMessages.NotHandled.MasterInfo newInstance3 = newInstance(EventStoreClientMessages.NotHandled.MasterInfo.getDefaultInstance(), newInstance2.getAdditionalInfo().toByteArray());
                            return InspectionResult.newBuilder().decision(InspectionDecision.Reconnect).description("NotHandled - NotMaster").address(newInstance3.getExternalTcpAddress(), newInstance3.getExternalTcpPort()).secureAddress(newInstance3.getExternalSecureTcpAddress(), newInstance3.getExternalSecureTcpPort()).build();
                        default:
                            logger.error("Unknown NotHandledReason: {}.", newInstance2.getReason());
                            return InspectionResult.newBuilder().decision(InspectionDecision.Retry).description("NotHandled - <unknown>").build();
                    }
                default:
                    drop(SubscriptionDropReason.ServerError, new CommandNotExpectedException(tcpPackage.command.toString()));
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description(tcpPackage.command.toString()).build();
            }
        } catch (Exception e) {
            drop(SubscriptionDropReason.Unknown, e);
            return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description("Exception - " + e.getMessage()).build();
        }
    }

    @Override // com.github.msemys.esjc.subscription.SubscriptionOperation
    public void connectionClosed() {
        drop(SubscriptionDropReason.ConnectionClosed, new ConnectionClosedException("Connection was closed."));
    }

    public void unsubscribe() {
        drop(SubscriptionDropReason.UserInitiated, null, this.connectionSupplier.get());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void confirmSubscription(long j, Long l) {
        Preconditions.checkArgument(j >= -1, "Invalid lastCommitPosition %d on subscription confirmation.", Long.valueOf(j));
        Preconditions.checkState(this.subscription == null, "Double confirmation of subscription.");
        logger.trace("Subscription {} to {}: subscribed at CommitPosition: {}, EventNumber: {}.", new Object[]{this.correlationId, streamId(), Long.valueOf(j), l});
        this.subscription = createSubscription(j, l);
        this.result.complete(this.subscription);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void eventAppeared(E e) {
        if (this.unsubscribed.get()) {
            return;
        }
        Preconditions.checkNotNull(this.subscription, "Subscription not confirmed, but event appeared!");
        logger.trace("Subscription {} to {}: event appeared ({}, {}, {} @ {}).", new Object[]{this.correlationId, streamId(), e.originalStreamId(), Long.valueOf(e.originalEventNumber()), e.originalEvent().eventType, e.originalPosition});
        action(() -> {
            this.listener.onEvent(this.subscription, e);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(TcpPackage tcpPackage) {
        this.connectionSupplier.get().writeAndFlush(tcpPackage);
    }

    private String streamId() {
        return Strings.defaultIfEmpty(this.streamId, "<all>");
    }

    private void action(Runnable runnable) {
        this.actionQueue.offer(runnable);
        if (this.actionQueue.size() > MAX_QUEUE_SIZE) {
            drop(SubscriptionDropReason.ProcessingQueueOverflow, new SubscriptionBufferOverflowException("client buffer too big"));
        }
        if (this.actionExecuting.compareAndSet(false, true)) {
            this.executor.execute(this::run);
        }
    }

    private void run() {
        while (true) {
            Runnable poll = this.actionQueue.poll();
            if (poll != null) {
                poll.run();
            } else {
                this.actionExecuting.set(false);
                if (this.actionQueue.isEmpty() || !this.actionExecuting.compareAndSet(false, true)) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <R extends MessageLite> R newInstance(R r, byte[] bArr) {
        try {
            return (R) r.getParserForType().parseFrom(bArr);
        } catch (InvalidProtocolBufferException e) {
            throw Throwables.propagate(e);
        }
    }
}
