/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc.subscription;

import com.github.msemys.esjc.ConnectionClosedException;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.Subscription;
import com.github.msemys.esjc.SubscriptionDropReason;
import com.github.msemys.esjc.SubscriptionListener;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.operation.AccessDeniedException;
import com.github.msemys.esjc.operation.CommandNotExpectedException;
import com.github.msemys.esjc.operation.InspectionDecision;
import com.github.msemys.esjc.operation.InspectionResult;
import com.github.msemys.esjc.operation.NotAuthenticatedException;
import com.github.msemys.esjc.operation.ServerErrorException;
import com.github.msemys.esjc.proto.EventStoreClientMessages;
import com.github.msemys.esjc.subscription.SubscriptionBufferOverflowException;
import com.github.msemys.esjc.subscription.SubscriptionOperation;
import com.github.msemys.esjc.tcp.TcpCommand;
import com.github.msemys.esjc.tcp.TcpFlag;
import com.github.msemys.esjc.tcp.TcpPackage;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Throwables;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import io.netty.channel.Channel;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriptionOperation<T extends Subscription, E extends ResolvedEvent>
implements SubscriptionOperation {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSubscriptionOperation.class);
    private static final int MAX_QUEUE_SIZE = 2000;
    private final CompletableFuture<Subscription> result;
    private final TcpCommand subscribeCommand;
    protected final String streamId;
    protected final boolean resolveLinkTos;
    protected final UserCredentials userCredentials;
    protected final SubscriptionListener<T, E> listener;
    protected final Supplier<Channel> connectionSupplier;
    private final Executor executor;
    private final Queue<Runnable> actionQueue = new ConcurrentLinkedQueue<Runnable>();
    private final AtomicBoolean actionExecuting = new AtomicBoolean();
    private T subscription;
    private final AtomicBoolean unsubscribed = new AtomicBoolean();
    protected UUID correlationId;

    protected AbstractSubscriptionOperation(CompletableFuture<Subscription> result, TcpCommand subscribeCommand, String streamId, boolean resolveLinkTos, UserCredentials userCredentials, SubscriptionListener<T, E> listener, Supplier<Channel> connectionSupplier, Executor executor) {
        Preconditions.checkNotNull(result, "result is null");
        Preconditions.checkNotNull(subscribeCommand, "subscribeCommand is null");
        Preconditions.checkNotNull(listener, "listener is null");
        Preconditions.checkNotNull(connectionSupplier, "connectionSupplier is null");
        Preconditions.checkNotNull(executor, "executor is null");
        this.result = result;
        this.subscribeCommand = subscribeCommand;
        this.streamId = streamId;
        this.resolveLinkTos = resolveLinkTos;
        this.userCredentials = userCredentials;
        this.listener = listener;
        this.connectionSupplier = connectionSupplier;
        this.executor = executor;
    }

    protected abstract MessageLite createSubscribeMessage();

    protected abstract T createSubscription(long var1, Long var3);

    protected abstract boolean inspect(TcpPackage var1, InspectionResult.Builder var2);

    @Override
    public boolean subscribe(UUID correlationId, Channel connection) {
        Preconditions.checkNotNull(connection, "connection is null");
        if (this.subscription != null || this.unsubscribed.get()) {
            return false;
        }
        this.correlationId = correlationId;
        connection.writeAndFlush((Object)TcpPackage.newBuilder().command(this.subscribeCommand).flag(this.userCredentials != null ? TcpFlag.Authenticated : TcpFlag.None).correlationId(correlationId).login(this.userCredentials != null ? this.userCredentials.username : null).password(this.userCredentials != null ? this.userCredentials.password : null).data(this.createSubscribeMessage().toByteArray()).build());
        return true;
    }

    @Override
    public void drop(SubscriptionDropReason reason, Exception exception, Channel connection) {
        if (this.unsubscribed.compareAndSet(false, true)) {
            logger.trace("Subscription {} to {}: closing subscription, reason: {}", new Object[]{this.correlationId, this.streamId(), reason, exception});
            if (reason != SubscriptionDropReason.UserInitiated) {
                Preconditions.checkNotNull(exception, "No exception provided for subscription drop reason '%s", new Object[]{reason});
                this.result.completeExceptionally(exception);
            }
            if (reason == SubscriptionDropReason.UserInitiated && this.subscription != null && connection != null) {
                connection.writeAndFlush((Object)TcpPackage.newBuilder().command(TcpCommand.UnsubscribeFromStream).correlationId(this.correlationId).data(EventStoreClientMessages.UnsubscribeFromStream.getDefaultInstance().toByteArray()).build());
            }
            if (this.subscription != null) {
                this.action(() -> this.listener.onClose(this.subscription, reason, exception));
            }
        }
    }

    @Override
    public InspectionResult inspect(TcpPackage tcpPackage) {
        try {
            InspectionResult.Builder builder = InspectionResult.newBuilder();
            if (this.inspect(tcpPackage, builder)) {
                return builder.build();
            }
            switch (tcpPackage.command) {
                case SubscriptionDropped: {
                    EventStoreClientMessages.SubscriptionDropped subscriptionDropped = AbstractSubscriptionOperation.newInstance(EventStoreClientMessages.SubscriptionDropped.getDefaultInstance(), tcpPackage.data);
                    switch (subscriptionDropped.getReason()) {
                        case Unsubscribed: {
                            this.drop(SubscriptionDropReason.UserInitiated, null);
                            break;
                        }
                        case AccessDenied: {
                            this.drop(SubscriptionDropReason.AccessDenied, new AccessDeniedException(String.format("Subscription to '%s' failed due to access denied.", this.streamId())));
                            break;
                        }
                        case NotFound: {
                            this.drop(SubscriptionDropReason.NotFound, new IllegalArgumentException(String.format("Subscription to '%s' failed due to not found.", this.streamId())));
                            break;
                        }
                        default: {
                            logger.trace("Subscription dropped by server. Reason: {}.", (Object)subscriptionDropped.getReason());
                            this.drop(SubscriptionDropReason.Unknown, new CommandNotExpectedException(String.format("Unsubscribe reason: '%s'.", new Object[]{subscriptionDropped.getReason()})));
                        }
                    }
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description(String.format("SubscriptionDropped: %s", new Object[]{subscriptionDropped.getReason()})).build();
                }
                case NotAuthenticated: {
                    this.drop(SubscriptionDropReason.NotAuthenticated, new NotAuthenticatedException(Strings.defaultIfEmpty(Strings.newString(tcpPackage.data), "Authentication error")));
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description("NotAuthenticated").build();
                }
                case BadRequest: {
                    this.drop(SubscriptionDropReason.ServerError, new ServerErrorException(Strings.defaultIfEmpty(Strings.newString(tcpPackage.data), "<no message>")));
                    return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description("BadRequest: " + Strings.newString(tcpPackage.data)).build();
                }
                case NotHandled: {
                    Preconditions.checkState(this.subscription == null, "NotHandled command appeared while we were already subscribed.");
                    EventStoreClientMessages.NotHandled notHandled = AbstractSubscriptionOperation.newInstance(EventStoreClientMessages.NotHandled.getDefaultInstance(), tcpPackage.data);
                    switch (notHandled.getReason()) {
                        case NotReady: {
                            return InspectionResult.newBuilder().decision(InspectionDecision.Retry).description("NotHandled - NotReady").build();
                        }
                        case TooBusy: {
                            return InspectionResult.newBuilder().decision(InspectionDecision.Retry).description("NotHandled - TooBusy").build();
                        }
                        case NotMaster: {
                            EventStoreClientMessages.NotHandled.MasterInfo masterInfo = AbstractSubscriptionOperation.newInstance(EventStoreClientMessages.NotHandled.MasterInfo.getDefaultInstance(), notHandled.getAdditionalInfo().toByteArray());
                            return InspectionResult.newBuilder().decision(InspectionDecision.Reconnect).description("NotHandled - NotMaster").address(masterInfo.getExternalTcpAddress(), masterInfo.getExternalTcpPort()).secureAddress(masterInfo.getExternalSecureTcpAddress(), masterInfo.getExternalSecureTcpPort()).build();
                        }
                    }
                    logger.error("Unknown NotHandledReason: {}.", (Object)notHandled.getReason());
                    return InspectionResult.newBuilder().decision(InspectionDecision.Retry).description("NotHandled - <unknown>").build();
                }
            }
            this.drop(SubscriptionDropReason.ServerError, new CommandNotExpectedException(tcpPackage.command.toString()));
            return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description(tcpPackage.command.toString()).build();
        }
        catch (Exception e) {
            this.drop(SubscriptionDropReason.Unknown, e);
            return InspectionResult.newBuilder().decision(InspectionDecision.EndOperation).description("Exception - " + e.getMessage()).build();
        }
    }

    @Override
    public void connectionClosed() {
        this.drop(SubscriptionDropReason.ConnectionClosed, new ConnectionClosedException("Connection was closed."));
    }

    public void unsubscribe() {
        this.drop(SubscriptionDropReason.UserInitiated, null, this.connectionSupplier.get());
    }

    protected void confirmSubscription(long lastCommitPosition, Long lastEventNumber) {
        Preconditions.checkArgument(lastCommitPosition >= -1L, "Invalid lastCommitPosition %d on subscription confirmation.", lastCommitPosition);
        Preconditions.checkState(this.subscription == null, "Double confirmation of subscription.");
        logger.trace("Subscription {} to {}: subscribed at CommitPosition: {}, EventNumber: {}.", new Object[]{this.correlationId, this.streamId(), lastCommitPosition, lastEventNumber});
        this.subscription = this.createSubscription(lastCommitPosition, lastEventNumber);
        this.result.complete((Subscription)this.subscription);
    }

    protected void eventAppeared(E event) {
        if (!this.unsubscribed.get()) {
            Preconditions.checkNotNull(this.subscription, "Subscription not confirmed, but event appeared!");
            logger.trace("Subscription {} to {}: event appeared ({}, {}, {} @ {}).", new Object[]{this.correlationId, this.streamId(), ((ResolvedEvent)event).originalStreamId(), ((ResolvedEvent)event).originalEventNumber(), ((ResolvedEvent)event).originalEvent().eventType, ((ResolvedEvent)event).originalPosition});
            this.action(() -> this.listener.onEvent(this.subscription, (ResolvedEvent)event));
        }
    }

    protected void send(TcpPackage tcpPackage) {
        this.connectionSupplier.get().writeAndFlush((Object)tcpPackage);
    }

    private String streamId() {
        return Strings.defaultIfEmpty(this.streamId, "<all>");
    }

    private void action(Runnable action) {
        this.actionQueue.offer(action);
        if (this.actionQueue.size() > 2000) {
            this.drop(SubscriptionDropReason.ProcessingQueueOverflow, new SubscriptionBufferOverflowException("client buffer too big"));
        }
        if (this.actionExecuting.compareAndSet(false, true)) {
            this.executor.execute(this::run);
        }
    }

    private void run() {
        while (true) {
            Runnable action;
            if ((action = this.actionQueue.poll()) != null) {
                action.run();
                continue;
            }
            this.actionExecuting.set(false);
            if (this.actionQueue.isEmpty() || !this.actionExecuting.compareAndSet(false, true)) break;
        }
    }

    protected static <R extends MessageLite> R newInstance(R message, byte[] data) {
        try {
            return (R)((MessageLite)message.getParserForType().parseFrom(data));
        }
        catch (InvalidProtocolBufferException e) {
            throw Throwables.propagate(e);
        }
    }
}

