/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc;

import com.github.msemys.esjc.AllEventsIterator;
import com.github.msemys.esjc.AllEventsSlice;
import com.github.msemys.esjc.AllEventsSpliterator;
import com.github.msemys.esjc.CannotEstablishConnectionException;
import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.CatchUpSubscriptionSettings;
import com.github.msemys.esjc.ConnectionClosedException;
import com.github.msemys.esjc.DeleteResult;
import com.github.msemys.esjc.EventData;
import com.github.msemys.esjc.EventReadResult;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.EventStoreException;
import com.github.msemys.esjc.EventStoreListener;
import com.github.msemys.esjc.PersistentSubscription;
import com.github.msemys.esjc.PersistentSubscriptionCreateResult;
import com.github.msemys.esjc.PersistentSubscriptionDeleteResult;
import com.github.msemys.esjc.PersistentSubscriptionListener;
import com.github.msemys.esjc.PersistentSubscriptionSettings;
import com.github.msemys.esjc.PersistentSubscriptionUpdateResult;
import com.github.msemys.esjc.Position;
import com.github.msemys.esjc.RawStreamMetadataResult;
import com.github.msemys.esjc.RecordedEvent;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.RetryableResolvedEvent;
import com.github.msemys.esjc.Settings;
import com.github.msemys.esjc.StreamEventsIterator;
import com.github.msemys.esjc.StreamEventsSlice;
import com.github.msemys.esjc.StreamEventsSpliterator;
import com.github.msemys.esjc.StreamMetadata;
import com.github.msemys.esjc.StreamMetadataResult;
import com.github.msemys.esjc.Subscription;
import com.github.msemys.esjc.SubscriptionListener;
import com.github.msemys.esjc.SystemSettings;
import com.github.msemys.esjc.Transaction;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.VolatileSubscriptionListener;
import com.github.msemys.esjc.WriteAttemptResult;
import com.github.msemys.esjc.WriteResult;
import com.github.msemys.esjc.event.Event;
import com.github.msemys.esjc.event.EventQueue;
import com.github.msemys.esjc.event.Events;
import com.github.msemys.esjc.node.EndpointDiscoverer;
import com.github.msemys.esjc.node.NodeEndpoints;
import com.github.msemys.esjc.operation.AppendToStreamOperation;
import com.github.msemys.esjc.operation.CommitTransactionOperation;
import com.github.msemys.esjc.operation.CreatePersistentSubscriptionOperation;
import com.github.msemys.esjc.operation.DeletePersistentSubscriptionOperation;
import com.github.msemys.esjc.operation.DeleteStreamOperation;
import com.github.msemys.esjc.operation.Operation;
import com.github.msemys.esjc.operation.ReadAllEventsBackwardOperation;
import com.github.msemys.esjc.operation.ReadAllEventsForwardOperation;
import com.github.msemys.esjc.operation.ReadEventOperation;
import com.github.msemys.esjc.operation.ReadStreamEventsBackwardOperation;
import com.github.msemys.esjc.operation.ReadStreamEventsForwardOperation;
import com.github.msemys.esjc.operation.StartTransactionOperation;
import com.github.msemys.esjc.operation.TransactionalWriteOperation;
import com.github.msemys.esjc.operation.TryAppendToStreamOperation;
import com.github.msemys.esjc.operation.UpdatePersistentSubscriptionOperation;
import com.github.msemys.esjc.operation.manager.OperationItem;
import com.github.msemys.esjc.operation.manager.OperationManager;
import com.github.msemys.esjc.ssl.CommonNameTrustManagerFactory;
import com.github.msemys.esjc.subscription.AllCatchUpSubscription;
import com.github.msemys.esjc.subscription.PersistentSubscriptionChannel;
import com.github.msemys.esjc.subscription.PersistentSubscriptionOperation;
import com.github.msemys.esjc.subscription.StreamCatchUpSubscription;
import com.github.msemys.esjc.subscription.VolatileSubscriptionOperation;
import com.github.msemys.esjc.subscription.manager.SubscriptionItem;
import com.github.msemys.esjc.subscription.manager.SubscriptionManager;
import com.github.msemys.esjc.system.SystemStreams;
import com.github.msemys.esjc.task.CloseConnection;
import com.github.msemys.esjc.task.EstablishTcpConnection;
import com.github.msemys.esjc.task.StartConnection;
import com.github.msemys.esjc.task.StartOperation;
import com.github.msemys.esjc.task.StartPersistentSubscription;
import com.github.msemys.esjc.task.StartSubscription;
import com.github.msemys.esjc.task.Task;
import com.github.msemys.esjc.task.TaskQueue;
import com.github.msemys.esjc.tcp.TcpPackage;
import com.github.msemys.esjc.tcp.TcpPackageDecoder;
import com.github.msemys.esjc.tcp.TcpPackageEncoder;
import com.github.msemys.esjc.tcp.handler.AuthenticationHandler;
import com.github.msemys.esjc.tcp.handler.HeartbeatHandler;
import com.github.msemys.esjc.tcp.handler.IdentificationHandler;
import com.github.msemys.esjc.tcp.handler.OperationHandler;
import com.github.msemys.esjc.transaction.TransactionManager;
import com.github.msemys.esjc.util.EmptyArrays;
import com.github.msemys.esjc.util.Numbers;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Ranges;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.SystemTime;
import com.github.msemys.esjc.util.Threads;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteOrder;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.net.ssl.TrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventStoreTcp
implements EventStore {
    private static final Logger logger = LoggerFactory.getLogger(EventStore.class);
    private static final int MAX_FRAME_LENGTH = 0x4000000;
    private final EventLoopGroup group = new NioEventLoopGroup(0, (ThreadFactory)new DefaultThreadFactory("esio"));
    private final Bootstrap bootstrap;
    private final OperationManager operationManager;
    private final SubscriptionManager subscriptionManager;
    private final Settings settings;
    private volatile Channel connection;
    private volatile ConnectingPhase connectingPhase = ConnectingPhase.INVALID;
    private volatile ScheduledFuture timer;
    private final TransactionManager transactionManager = new TransactionManagerImpl();
    private final TaskQueue tasks;
    private final EndpointDiscoverer discoverer;
    private final ReconnectionInfo reconnectionInfo = new ReconnectionInfo();
    private final SystemTime lastOperationTimeoutCheck = SystemTime.zero();
    private final EventQueue events;
    private final Object mutex = new Object();

    protected EventStoreTcp(final Settings settings) {
        Preconditions.checkNotNull(settings, "settings is null");
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().option(ChannelOption.SO_KEEPALIVE, (Object)settings.tcpSettings.keepAlive)).option(ChannelOption.TCP_NODELAY, (Object)settings.tcpSettings.noDelay)).option(ChannelOption.SO_SNDBUF, (Object)settings.tcpSettings.sendBufferSize)).option(ChannelOption.SO_RCVBUF, (Object)settings.tcpSettings.receiveBufferSize)).option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(settings.tcpSettings.writeBufferLowWaterMark, settings.tcpSettings.writeBufferHighWaterMark))).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)settings.tcpSettings.connectTimeout.toMillis()))).group(this.group)).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (settings.sslSettings.useSslConnection) {
                    SslContextBuilder builder = SslContextBuilder.forClient();
                    switch (settings.sslSettings.validationMode) {
                        case NONE: {
                            builder = builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
                            break;
                        }
                        case COMMON_NAME: {
                            builder = builder.trustManager((TrustManagerFactory)((Object)new CommonNameTrustManagerFactory(settings.sslSettings.certificateCommonName)));
                            break;
                        }
                        case CERTIFICATE: {
                            builder = builder.trustManager(settings.sslSettings.certificateFile);
                        }
                    }
                    SslContext sslContext = builder.build();
                    pipeline.addLast("ssl", (ChannelHandler)sslContext.newHandler(ch.alloc()));
                }
                pipeline.addLast("frame-decoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, 0x4000000, 0, 4, 0, 4, true));
                pipeline.addLast("package-decoder", (ChannelHandler)new TcpPackageDecoder());
                pipeline.addLast("frame-encoder", (ChannelHandler)new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
                pipeline.addLast("package-encoder", (ChannelHandler)new TcpPackageEncoder());
                pipeline.addLast("idle-state-handler", (ChannelHandler)new IdleStateHandler(0L, settings.heartbeatInterval.toMillis(), 0L, TimeUnit.MILLISECONDS));
                pipeline.addLast("heartbeat-handler", (ChannelHandler)new HeartbeatHandler(settings.heartbeatTimeout));
                pipeline.addLast("authentication-handler", (ChannelHandler)new AuthenticationHandler(settings.userCredentials, settings.operationTimeout).whenComplete(x$0 -> EventStoreTcp.this.onAuthenticationCompleted(x$0)));
                pipeline.addLast("identification-handler", (ChannelHandler)new IdentificationHandler(settings.connectionName, settings.operationTimeout).whenComplete(x$0 -> EventStoreTcp.this.onIdentificationCompleted(x$0)));
                pipeline.addLast("operation-handler", (ChannelHandler)new OperationHandler(EventStoreTcp.this.operationManager, EventStoreTcp.this.subscriptionManager).whenBadRequest(x$0 -> EventStoreTcp.this.onBadRequest(x$0)).whenChannelError(x$0 -> EventStoreTcp.this.onChannelError(x$0)).whenReconnect(x$0 -> EventStoreTcp.this.onReconnect(x$0)));
            }
        });
        this.operationManager = new OperationManager(settings);
        this.subscriptionManager = new SubscriptionManager(settings);
        this.settings = settings;
        this.discoverer = settings.endpointDiscovererFactory.create(settings, (ScheduledExecutorService)this.group);
        Preconditions.checkNotNull(this.discoverer, "endpoint discoverer cannot be null");
        this.tasks = new TaskQueue(this.executor());
        this.tasks.register(StartConnection.class, this::handle);
        this.tasks.register(CloseConnection.class, this::handle);
        this.tasks.register(EstablishTcpConnection.class, this::handle);
        this.tasks.register(StartOperation.class, this::handle);
        this.tasks.register(StartSubscription.class, this::handle);
        this.tasks.register(StartPersistentSubscription.class, this::handle);
        this.events = new EventQueue(this.executor());
    }

    @Override
    public CompletableFuture<DeleteResult> deleteStream(String stream, long expectedVersion, boolean hardDelete, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        CompletableFuture<DeleteResult> result = new CompletableFuture<DeleteResult>();
        this.enqueue(new DeleteStreamOperation(result, this.settings.requireMaster, stream, expectedVersion, hardDelete, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<WriteResult> appendToStream(String stream, long expectedVersion, Iterable<EventData> events, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkNotNull(events, "events is null");
        CompletableFuture<WriteResult> result = new CompletableFuture<WriteResult>();
        this.enqueue(new AppendToStreamOperation(result, this.settings.requireMaster, stream, expectedVersion, events, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<WriteAttemptResult> tryAppendToStream(String stream, long expectedVersion, Iterable<EventData> events, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkNotNull(events, "events is null");
        CompletableFuture<WriteAttemptResult> result = new CompletableFuture<WriteAttemptResult>();
        this.enqueue(new TryAppendToStreamOperation(result, this.settings.requireMaster, stream, expectedVersion, events, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<Transaction> startTransaction(String stream, long expectedVersion, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        CompletableFuture<Transaction> result = new CompletableFuture<Transaction>();
        this.enqueue(new StartTransactionOperation(result, this.settings.requireMaster, stream, expectedVersion, this.transactionManager, userCredentials));
        return result;
    }

    @Override
    public Transaction continueTransaction(long transactionId, UserCredentials userCredentials) {
        return new Transaction(transactionId, userCredentials, this.transactionManager);
    }

    @Override
    public CompletableFuture<EventReadResult> readEvent(String stream, long eventNumber, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(eventNumber >= -1L, "eventNumber out of range");
        CompletableFuture<EventReadResult> result = new CompletableFuture<EventReadResult>();
        this.enqueue(new ReadEventOperation(result, stream, eventNumber, resolveLinkTos, this.settings.requireMaster, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<StreamEventsSlice> readStreamEventsForward(String stream, long eventNumber, int maxCount, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!Numbers.isNegative(eventNumber), "eventNumber should not be negative");
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(maxCount), "maxCount is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        CompletableFuture<StreamEventsSlice> result = new CompletableFuture<StreamEventsSlice>();
        this.enqueue(new ReadStreamEventsForwardOperation(result, stream, eventNumber, maxCount, resolveLinkTos, this.settings.requireMaster, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<StreamEventsSlice> readStreamEventsBackward(String stream, long eventNumber, int maxCount, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(eventNumber >= -1L, "eventNumber out of range");
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(maxCount), "maxCount is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        CompletableFuture<StreamEventsSlice> result = new CompletableFuture<StreamEventsSlice>();
        this.enqueue(new ReadStreamEventsBackwardOperation(result, stream, eventNumber, maxCount, resolveLinkTos, this.settings.requireMaster, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<AllEventsSlice> readAllEventsForward(Position position, int maxCount, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(maxCount), "maxCount is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        CompletableFuture<AllEventsSlice> result = new CompletableFuture<AllEventsSlice>();
        this.enqueue(new ReadAllEventsForwardOperation(result, position, maxCount, resolveLinkTos, this.settings.requireMaster, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<AllEventsSlice> readAllEventsBackward(Position position, int maxCount, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(maxCount), "maxCount is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        CompletableFuture<AllEventsSlice> result = new CompletableFuture<AllEventsSlice>();
        this.enqueue(new ReadAllEventsBackwardOperation(result, position, maxCount, resolveLinkTos, this.settings.requireMaster, userCredentials));
        return result;
    }

    @Override
    public Iterator<ResolvedEvent> iterateStreamEventsForward(String stream, long eventNumber, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!Numbers.isNegative(eventNumber), "eventNumber should not be negative");
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return new StreamEventsIterator(eventNumber, i -> this.readStreamEventsForward(stream, (long)i, batchSize, resolveLinkTos, userCredentials));
    }

    @Override
    public Iterator<ResolvedEvent> iterateStreamEventsBackward(String stream, long eventNumber, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return new StreamEventsIterator(eventNumber, i -> this.readStreamEventsBackward(stream, (long)i, batchSize, resolveLinkTos, userCredentials));
    }

    @Override
    public Iterator<ResolvedEvent> iterateAllEventsForward(Position position, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return new AllEventsIterator(position, p -> this.readAllEventsForward((Position)p, batchSize, resolveLinkTos, userCredentials));
    }

    @Override
    public Iterator<ResolvedEvent> iterateAllEventsBackward(Position position, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return new AllEventsIterator(position, p -> this.readAllEventsBackward((Position)p, batchSize, resolveLinkTos, userCredentials));
    }

    @Override
    public Stream<ResolvedEvent> streamEventsForward(String stream, long eventNumber, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return StreamSupport.stream(new StreamEventsSpliterator(eventNumber, i -> this.readStreamEventsForward(stream, (long)i, batchSize, resolveLinkTos, userCredentials)), false);
    }

    @Override
    public Stream<ResolvedEvent> streamEventsBackward(String stream, long eventNumber, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return StreamSupport.stream(new StreamEventsSpliterator(eventNumber, i -> this.readStreamEventsBackward(stream, (long)i, batchSize, resolveLinkTos, userCredentials)), false);
    }

    @Override
    public Stream<ResolvedEvent> streamAllEventsForward(Position position, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return StreamSupport.stream(new AllEventsSpliterator(position, p -> this.readAllEventsForward((Position)p, batchSize, resolveLinkTos, userCredentials)), false);
    }

    @Override
    public Stream<ResolvedEvent> streamAllEventsBackward(Position position, int batchSize, boolean resolveLinkTos, UserCredentials userCredentials) {
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(batchSize), "batchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        return StreamSupport.stream(new AllEventsSpliterator(position, p -> this.readAllEventsBackward((Position)p, batchSize, resolveLinkTos, userCredentials)), false);
    }

    @Override
    public CompletableFuture<Subscription> subscribeToStream(String stream, boolean resolveLinkTos, VolatileSubscriptionListener listener, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkNotNull(listener, "listener is null");
        CompletableFuture<Subscription> result = new CompletableFuture<Subscription>();
        this.enqueue(new StartSubscription(result, stream, resolveLinkTos, userCredentials, listener, this.settings.maxOperationRetries, this.settings.operationTimeout));
        return result;
    }

    @Override
    public CompletableFuture<Subscription> subscribeToAll(boolean resolveLinkTos, VolatileSubscriptionListener listener, UserCredentials userCredentials) {
        Preconditions.checkNotNull(listener, "listener is null");
        CompletableFuture<Subscription> result = new CompletableFuture<Subscription>();
        this.enqueue(new StartSubscription(result, "", resolveLinkTos, userCredentials, listener, this.settings.maxOperationRetries, this.settings.operationTimeout));
        return result;
    }

    @Override
    public CatchUpSubscription subscribeToStreamFrom(String stream, Long eventNumber, CatchUpSubscriptionSettings settings, CatchUpSubscriptionListener listener, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkNotNull(listener, "listener is null");
        Preconditions.checkNotNull(settings, "settings is null");
        StreamCatchUpSubscription subscription = new StreamCatchUpSubscription(this, stream, eventNumber, settings.resolveLinkTos, listener, userCredentials, settings.readBatchSize, settings.maxLiveQueueSize, this.executor());
        subscription.start();
        return subscription;
    }

    @Override
    public CatchUpSubscription subscribeToAllFrom(Position position, CatchUpSubscriptionSettings settings, CatchUpSubscriptionListener listener, UserCredentials userCredentials) {
        Preconditions.checkNotNull(listener, "listener is null");
        Preconditions.checkNotNull(settings, "settings is null");
        AllCatchUpSubscription subscription = new AllCatchUpSubscription((EventStore)this, position, settings.resolveLinkTos, listener, userCredentials, settings.readBatchSize, settings.maxLiveQueueSize, this.executor());
        subscription.start();
        return subscription;
    }

    @Override
    public CompletableFuture<PersistentSubscription> subscribeToPersistent(String stream, String groupName, PersistentSubscriptionListener listener, UserCredentials userCredentials, int bufferSize, boolean autoAck) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName is null or empty");
        Preconditions.checkNotNull(listener, "listener is null");
        Preconditions.checkArgument(Numbers.isPositive(bufferSize), "bufferSize should be positive");
        PersistentSubscription subscription = new PersistentSubscription(groupName, stream, listener, userCredentials, bufferSize, autoAck, this.executor()){

            @Override
            protected CompletableFuture<Subscription> startSubscription(String subscriptionId, String streamId, int bufferSize, SubscriptionListener<PersistentSubscriptionChannel, RetryableResolvedEvent> listener, UserCredentials userCredentials) {
                CompletableFuture<Subscription> result = new CompletableFuture<Subscription>();
                EventStoreTcp.this.enqueue(new StartPersistentSubscription(result, subscriptionId, streamId, bufferSize, userCredentials, listener, ((EventStoreTcp)EventStoreTcp.this).settings.maxOperationRetries, ((EventStoreTcp)EventStoreTcp.this).settings.operationTimeout));
                return result;
            }
        };
        return subscription.start();
    }

    @Override
    public CompletableFuture<PersistentSubscriptionCreateResult> createPersistentSubscription(String stream, String groupName, PersistentSubscriptionSettings settings, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName is null or empty");
        Preconditions.checkNotNull(settings, "settings is null");
        CompletableFuture<PersistentSubscriptionCreateResult> result = new CompletableFuture<PersistentSubscriptionCreateResult>();
        this.enqueue(new CreatePersistentSubscriptionOperation(result, stream, groupName, settings, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<PersistentSubscriptionUpdateResult> updatePersistentSubscription(String stream, String groupName, PersistentSubscriptionSettings settings, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName is null or empty");
        Preconditions.checkNotNull(settings, "settings is null");
        CompletableFuture<PersistentSubscriptionUpdateResult> result = new CompletableFuture<PersistentSubscriptionUpdateResult>();
        this.enqueue(new UpdatePersistentSubscriptionOperation(result, stream, groupName, settings, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<PersistentSubscriptionDeleteResult> deletePersistentSubscription(String stream, String groupName, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName is null or empty");
        CompletableFuture<PersistentSubscriptionDeleteResult> result = new CompletableFuture<PersistentSubscriptionDeleteResult>();
        this.enqueue(new DeletePersistentSubscriptionOperation(result, stream, groupName, userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<WriteResult> setStreamMetadata(String stream, long expectedMetastreamVersion, byte[] metadata, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        Preconditions.checkArgument(!SystemStreams.isMetastream(stream), "Setting metadata for metastream '%s' is not supported", stream);
        CompletableFuture<WriteResult> result = new CompletableFuture<WriteResult>();
        EventData metaevent = EventData.newBuilder().type("$metadata").jsonData(metadata).build();
        this.enqueue(new AppendToStreamOperation(result, this.settings.requireMaster, SystemStreams.metastreamOf(stream), expectedMetastreamVersion, Collections.singletonList(metaevent), userCredentials));
        return result;
    }

    @Override
    public CompletableFuture<StreamMetadataResult> getStreamMetadata(String stream, UserCredentials userCredentials) {
        CompletableFuture<StreamMetadataResult> result = new CompletableFuture<StreamMetadataResult>();
        this.getStreamMetadataAsRawBytes(stream, userCredentials).whenComplete((r, t) -> {
            if (t != null) {
                result.completeExceptionally((Throwable)t);
            } else if (r.streamMetadata == null || r.streamMetadata.length == 0) {
                result.complete(new StreamMetadataResult(r.stream, r.isStreamDeleted, r.metastreamVersion, StreamMetadata.empty()));
            } else {
                try {
                    result.complete(new StreamMetadataResult(r.stream, r.isStreamDeleted, r.metastreamVersion, StreamMetadata.fromJson(r.streamMetadata)));
                }
                catch (Exception e) {
                    result.completeExceptionally(e);
                }
            }
        });
        return result;
    }

    @Override
    public CompletableFuture<RawStreamMetadataResult> getStreamMetadataAsRawBytes(String stream, UserCredentials userCredentials) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream), "stream is null or empty");
        CompletableFuture<RawStreamMetadataResult> result = new CompletableFuture<RawStreamMetadataResult>();
        this.readEvent(SystemStreams.metastreamOf(stream), -1L, false, userCredentials).whenComplete((r, t) -> {
            if (t != null) {
                result.completeExceptionally((Throwable)t);
            } else {
                switch (r.status) {
                    case Success: {
                        if (r.event == null) {
                            result.completeExceptionally(new Exception("Event is null while operation result is Success."));
                            break;
                        }
                        RecordedEvent event = r.event.originalEvent();
                        result.complete(event == null ? new RawStreamMetadataResult(stream, false, -1L, EmptyArrays.EMPTY_BYTES) : new RawStreamMetadataResult(stream, false, event.eventNumber, event.data));
                        break;
                    }
                    case NotFound: 
                    case NoStream: {
                        result.complete(new RawStreamMetadataResult(stream, false, -1L, EmptyArrays.EMPTY_BYTES));
                        break;
                    }
                    case StreamDeleted: {
                        result.complete(new RawStreamMetadataResult(stream, true, Long.MAX_VALUE, EmptyArrays.EMPTY_BYTES));
                        break;
                    }
                    default: {
                        result.completeExceptionally(new IllegalStateException("Unexpected ReadEventResult: " + (Object)((Object)r.status)));
                    }
                }
            }
        });
        return result;
    }

    @Override
    public CompletableFuture<WriteResult> setSystemSettings(SystemSettings settings, UserCredentials userCredentials) {
        Preconditions.checkNotNull(settings, "settings is null");
        return this.appendToStream("$settings", -2L, Collections.singletonList(EventData.newBuilder().type("$settings").jsonData(settings.toJson()).build()), userCredentials);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connect() {
        Object object = this.mutex;
        synchronized (object) {
            if (!this.isRunning()) {
                this.reconnectionInfo.reset();
                this.timer = this.group.scheduleAtFixedRate(this::timerTick, 200L, 200L, TimeUnit.MILLISECONDS);
            }
        }
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        result.whenComplete((value, throwable) -> {
            if (throwable != null) {
                logger.error("Unable to connect: {}", (Object)throwable.getMessage());
            }
        });
        this.tasks.enqueue(new StartConnection(result, this.discoverer));
    }

    @Override
    public void disconnect() {
        this.disconnect("user initiated", null);
    }

    @Override
    public void shutdown() {
        this.disconnect("shutdown", null);
        if (this.executor() instanceof ExecutorService) {
            ((ExecutorService)this.executor()).shutdown();
        }
        this.group.shutdownGracefully();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnect(String reason, Throwable cause) {
        Object object = this.mutex;
        synchronized (object) {
            if (this.isRunning()) {
                this.timer.cancel(true);
                this.timer = null;
                this.operationManager.cleanUp(cause);
                this.subscriptionManager.cleanUp(cause);
                this.closeTcpConnection(reason);
                this.connectingPhase = ConnectingPhase.INVALID;
                this.fireEvent(Events.clientDisconnected());
                logger.info("Disconnected, reason: {}", (Object)reason);
            }
        }
    }

    private boolean isRunning() {
        return this.timer != null && !this.timer.isDone();
    }

    @Override
    public Settings settings() {
        return this.settings;
    }

    @Override
    public void addListener(EventStoreListener listener) {
        Preconditions.checkNotNull(listener, "listener is null");
        this.events.register(listener);
    }

    @Override
    public void removeListener(EventStoreListener listener) {
        Preconditions.checkNotNull(listener, "listener is null");
        this.events.unregister(listener);
    }

    private Executor executor() {
        return this.settings.executor;
    }

    private void fireEvent(Event event) {
        this.events.enqueue(event);
    }

    private void onAuthenticationCompleted(AuthenticationHandler.AuthenticationStatus status) {
        if (status == AuthenticationHandler.AuthenticationStatus.SUCCESS || status == AuthenticationHandler.AuthenticationStatus.IGNORED) {
            this.gotoIdentificationPhase();
        } else {
            this.fireEvent(Events.authenticationFailed());
        }
    }

    private void onIdentificationCompleted(IdentificationHandler.IdentificationStatus status) {
        if (status == IdentificationHandler.IdentificationStatus.SUCCESS) {
            this.gotoConnectedPhase();
        }
    }

    private void onBadRequest(TcpPackage tcpPackage) {
        this.handle(new CloseConnection("Connection-wide BadRequest received. Too dangerous to continue.", new EventStoreException("Bad request received from server. Error: " + Strings.defaultIfEmpty(Strings.newString(tcpPackage.data), "<no message>"))));
    }

    private void onChannelError(Throwable throwable) {
        if (this.settings().disconnectOnTcpChannelError) {
            this.handle(new CloseConnection("Error when processing TCP package", throwable));
        } else {
            logger.error("Failed processing TCP package", throwable);
            this.fireEvent(Events.errorOccurred(throwable));
        }
    }

    private void onReconnect(NodeEndpoints nodeEndpoints) {
        this.reconnectTo(nodeEndpoints);
    }

    private void timerTick() {
        try {
            switch (this.connectionState()) {
                case INIT: {
                    if (this.connectingPhase != ConnectingPhase.RECONNECTING || !this.reconnectionInfo.timestamp.isElapsed(this.settings.reconnectionDelay)) break;
                    logger.debug("Checking reconnection...");
                    this.reconnectionInfo.inc();
                    if (this.settings.maxReconnections >= 0 && this.reconnectionInfo.reconnectionAttempt > this.settings.maxReconnections) {
                        this.handle(new CloseConnection("Reconnection limit reached"));
                        break;
                    }
                    this.fireEvent(Events.clientReconnecting());
                    this.operationManager.checkTimeoutsAndRetry(this.connection);
                    this.discoverEndpoint(null);
                    break;
                }
                case CONNECTED: {
                    this.checkOperationTimeout();
                }
            }
        }
        catch (Exception e) {
            logger.error("Error occurred in timer thread", (Throwable)e);
        }
    }

    private void checkOperationTimeout() {
        if (this.lastOperationTimeoutCheck.isElapsed(this.settings.operationTimeoutCheckInterval)) {
            this.operationManager.checkTimeoutsAndRetry(this.connection);
            this.subscriptionManager.checkTimeoutsAndRetry(this.connection);
            this.lastOperationTimeoutCheck.update();
        }
    }

    private void gotoIdentificationPhase() {
        if (this.connection == null) {
            logger.debug("connection was null when going to Identification Phase, going to Reconnecting Phase instead");
            this.gotoReconnectingPhase();
        } else {
            this.connectingPhase = ConnectingPhase.IDENTIFICATION;
        }
    }

    private void gotoConnectedPhase() {
        if (this.connection == null) {
            logger.debug("connection was null when going to Connected Phase, going to Reconnecting Phase instead");
            this.gotoReconnectingPhase();
        } else {
            this.connectingPhase = ConnectingPhase.CONNECTED;
            this.reconnectionInfo.reset();
            this.fireEvent(Events.clientConnected((InetSocketAddress)this.connection.remoteAddress()));
            this.checkOperationTimeout();
        }
    }

    private void gotoReconnectingPhase() {
        this.connectingPhase = ConnectingPhase.RECONNECTING;
        this.reconnectionInfo.touch();
    }

    private void reconnectTo(NodeEndpoints endpoints) {
        InetSocketAddress endpoint;
        InetSocketAddress inetSocketAddress = endpoint = this.settings.sslSettings.useSslConnection && endpoints.secureTcpEndpoint != null ? endpoints.secureTcpEndpoint : endpoints.tcpEndpoint;
        if (endpoint == null) {
            this.handle(new CloseConnection("No endpoint is specified while trying to reconnect."));
        } else if (this.connectionState() == ConnectionState.CONNECTED && !this.connection.remoteAddress().equals(endpoint)) {
            String message = String.format("Connection '%s': going to reconnect to [%s]. Current endpoint: [%s, L%s].", this.connection.id(), endpoint, this.connection.remoteAddress(), this.connection.localAddress());
            logger.trace(message);
            this.closeTcpConnection(message);
            this.connectingPhase = ConnectingPhase.ENDPOINT_DISCOVERY;
            this.handle(new EstablishTcpConnection(endpoints));
        }
    }

    private void discoverEndpoint(CompletableFuture<Void> result) {
        logger.debug("Discovering endpoint...");
        if (this.connectionState() == ConnectionState.INIT && this.connectingPhase == ConnectingPhase.RECONNECTING) {
            this.connectingPhase = ConnectingPhase.ENDPOINT_DISCOVERY;
            this.discoverer.discover(this.connection != null ? (InetSocketAddress)this.connection.remoteAddress() : null).whenComplete((nodeEndpoints, throwable) -> {
                if (throwable == null) {
                    this.tasks.enqueue(new EstablishTcpConnection((NodeEndpoints)nodeEndpoints));
                    if (result != null) {
                        result.complete(null);
                    }
                } else {
                    this.tasks.enqueue(new CloseConnection("Failed to resolve TCP endpoint to which to connect.", (Throwable)throwable));
                    if (result != null) {
                        result.completeExceptionally(new CannotEstablishConnectionException("Cannot resolve target end point.", (Throwable)throwable));
                    }
                }
            });
        }
    }

    private void closeTcpConnection(String reason) {
        if (this.connection != null) {
            logger.debug("Closing TCP connection, reason: {}", (Object)reason);
            try {
                this.connection.close().await(this.settings.tcpSettings.closeTimeout.toMillis());
            }
            catch (Exception e) {
                logger.warn("Unable to close connection gracefully", (Throwable)e);
            }
        } else {
            this.gotoReconnectingPhase();
        }
    }

    private void onTcpConnectionClosed() {
        if (this.connection != null) {
            this.subscriptionManager.purgeSubscribedAndDropped(this.connection.id());
            this.fireEvent(Events.connectionClosed());
        }
        this.connection = null;
        this.gotoReconnectingPhase();
    }

    private void handle(StartConnection task) {
        logger.debug("StartConnection");
        switch (this.connectionState()) {
            case INIT: {
                if (this.connectingPhase != ConnectingPhase.ENDPOINT_DISCOVERY) {
                    this.connectingPhase = ConnectingPhase.RECONNECTING;
                }
                this.discoverEndpoint(task.result);
                break;
            }
            case CONNECTED: 
            case CONNECTING: {
                task.result.completeExceptionally(new IllegalStateException(String.format("Connection %s is already active.", this.connection)));
                break;
            }
            case CLOSED: {
                task.result.completeExceptionally(new ConnectionClosedException("Connection is closed"));
                break;
            }
            default: {
                throw new IllegalStateException("Unknown connection state");
            }
        }
    }

    private void handle(EstablishTcpConnection task) {
        InetSocketAddress endpoint;
        InetSocketAddress inetSocketAddress = endpoint = this.settings.sslSettings.useSslConnection && task.endpoints.secureTcpEndpoint != null ? task.endpoints.secureTcpEndpoint : task.endpoints.tcpEndpoint;
        if (endpoint == null) {
            this.handle(new CloseConnection("No endpoint to node specified."));
        } else {
            logger.debug("Connecting to [{}]...", (Object)endpoint);
            if (this.connectionState() == ConnectionState.INIT && this.connectingPhase == ConnectingPhase.ENDPOINT_DISCOVERY) {
                this.connectingPhase = ConnectingPhase.CONNECTION_ESTABLISHING;
                this.bootstrap.connect((SocketAddress)endpoint).addListener(connectFuture -> {
                    if (connectFuture.isSuccess()) {
                        logger.info("Connection to [{}, L{}] established.", (Object)connectFuture.channel().remoteAddress(), (Object)connectFuture.channel().localAddress());
                        this.connectingPhase = ConnectingPhase.AUTHENTICATION;
                        this.connection = connectFuture.channel();
                        this.connection.closeFuture().addListener(closeFuture -> {
                            logger.info("Connection to [{}, L{}] closed.", (Object)closeFuture.channel().remoteAddress(), (Object)closeFuture.channel().localAddress());
                            this.onTcpConnectionClosed();
                        });
                    } else {
                        this.closeTcpConnection("unable to connect");
                    }
                });
            }
        }
    }

    private void handle(CloseConnection task) {
        if (task.throwable != null) {
            logger.error(task.reason, task.throwable);
        }
        if (this.connectionState() == ConnectionState.CLOSED) {
            logger.debug("CloseConnection IGNORED because connection is CLOSED, reason: {}", (Object)task.reason);
        } else {
            logger.debug("CloseConnection, reason: {}", (Object)task.reason);
            if (task.throwable != null) {
                this.fireEvent(Events.errorOccurred(task.throwable));
            }
            this.disconnect(task.reason, task.throwable);
        }
    }

    private void handle(StartOperation task) {
        Operation operation = task.operation;
        switch (this.connectionState()) {
            case INIT: {
                if (this.connectingPhase == ConnectingPhase.INVALID) {
                    operation.fail(new IllegalStateException("No connection"));
                    break;
                }
            }
            case CONNECTING: {
                logger.debug("StartOperation enqueue {}, {}, {}, {}.", new Object[]{operation.getClass().getSimpleName(), operation, this.settings.maxOperationRetries, this.settings.operationTimeout});
                this.operationManager.enqueueOperation(new OperationItem(operation, this.settings.maxOperationRetries, this.settings.operationTimeout));
                break;
            }
            case CONNECTED: {
                logger.debug("StartOperation schedule {}, {}, {}, {}.", new Object[]{operation.getClass().getSimpleName(), operation, this.settings.maxOperationRetries, this.settings.operationTimeout});
                this.operationManager.scheduleOperation(new OperationItem(operation, this.settings.maxOperationRetries, this.settings.operationTimeout), this.connection);
                break;
            }
            case CLOSED: {
                operation.fail(new ConnectionClosedException("Connection is closed"));
                break;
            }
            default: {
                throw new IllegalStateException("Unknown connection state");
            }
        }
    }

    private void handle(StartSubscription task) {
        ConnectionState state = this.connectionState();
        switch (state) {
            case INIT: {
                if (this.connectingPhase == ConnectingPhase.INVALID) {
                    task.result.completeExceptionally(new IllegalStateException("No connection"));
                    break;
                }
            }
            case CONNECTED: 
            case CONNECTING: {
                VolatileSubscriptionOperation operation = new VolatileSubscriptionOperation(task.result, task.streamId, task.resolveLinkTos, task.userCredentials, task.listener, () -> this.connection, this.executor());
                logger.debug("StartSubscription {} {}, {}, {}, {}.", new Object[]{state == ConnectionState.CONNECTED ? "fire" : "enqueue", operation.getClass().getSimpleName(), operation, task.maxRetries, task.timeout});
                SubscriptionItem item = new SubscriptionItem(operation, task.maxRetries, task.timeout);
                if (state == ConnectionState.CONNECTED) {
                    this.subscriptionManager.startSubscription(item, this.connection);
                    break;
                }
                this.subscriptionManager.enqueueSubscription(item);
                break;
            }
            case CLOSED: {
                task.result.completeExceptionally(new ConnectionClosedException("Connection is closed"));
                break;
            }
            default: {
                throw new IllegalStateException("Unknown connection state");
            }
        }
    }

    private void handle(StartPersistentSubscription task) {
        ConnectionState state = this.connectionState();
        switch (state) {
            case INIT: {
                if (this.connectingPhase == ConnectingPhase.INVALID) {
                    task.result.completeExceptionally(new IllegalStateException("No connection"));
                    break;
                }
            }
            case CONNECTED: 
            case CONNECTING: {
                PersistentSubscriptionOperation operation = new PersistentSubscriptionOperation(task.result, task.subscriptionId, task.streamId, task.bufferSize, task.userCredentials, task.listener, () -> this.connection, this.executor());
                logger.debug("StartSubscription {} {}, {}, {}, {}.", new Object[]{state == ConnectionState.CONNECTED ? "fire" : "enqueue", operation.getClass().getSimpleName(), operation, task.maxRetries, task.timeout});
                SubscriptionItem item = new SubscriptionItem(operation, task.maxRetries, task.timeout);
                if (state == ConnectionState.CONNECTED) {
                    this.subscriptionManager.startSubscription(item, this.connection);
                    break;
                }
                this.subscriptionManager.enqueueSubscription(item);
                break;
            }
            case CLOSED: {
                task.result.completeExceptionally(new ConnectionClosedException("Connection is closed"));
                break;
            }
            default: {
                throw new IllegalStateException("Unknown connection state");
            }
        }
    }

    private void enqueue(Operation operation) {
        while (this.operationManager.totalOperationCount() >= this.settings.maxOperationQueueSize) {
            Threads.sleepUninterruptibly(1L);
        }
        this.enqueue(new StartOperation(operation));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enqueue(Task task) {
        Object object = this.mutex;
        synchronized (object) {
            if (!this.isRunning()) {
                this.connect();
            }
        }
        logger.trace("enqueueing task {}.", (Object)task.getClass().getSimpleName());
        this.tasks.enqueue(task);
    }

    private ConnectionState connectionState() {
        if (this.connection == null) {
            return ConnectionState.INIT;
        }
        if (this.connection.isOpen()) {
            return this.connection.isActive() && this.connectingPhase == ConnectingPhase.CONNECTED ? ConnectionState.CONNECTED : ConnectionState.CONNECTING;
        }
        return ConnectionState.CLOSED;
    }

    private static class ReconnectionInfo {
        int reconnectionAttempt;
        final SystemTime timestamp = SystemTime.zero();

        private ReconnectionInfo() {
        }

        void inc() {
            ++this.reconnectionAttempt;
            this.touch();
        }

        void reset() {
            this.reconnectionAttempt = 0;
            this.touch();
        }

        void touch() {
            this.timestamp.update();
        }
    }

    private class TransactionManagerImpl
    implements TransactionManager {
        private TransactionManagerImpl() {
        }

        @Override
        public CompletableFuture<Void> write(Transaction transaction, Iterable<EventData> events, UserCredentials userCredentials) {
            Preconditions.checkNotNull(transaction, "transaction is null");
            Preconditions.checkNotNull(events, "events is null");
            CompletableFuture<Void> result = new CompletableFuture<Void>();
            EventStoreTcp.this.enqueue(new TransactionalWriteOperation(result, ((EventStoreTcp)EventStoreTcp.this).settings.requireMaster, transaction.transactionId, events, userCredentials));
            return result;
        }

        @Override
        public CompletableFuture<WriteResult> commit(Transaction transaction, UserCredentials userCredentials) {
            Preconditions.checkNotNull(transaction, "transaction is null");
            CompletableFuture<WriteResult> result = new CompletableFuture<WriteResult>();
            EventStoreTcp.this.enqueue(new CommitTransactionOperation(result, ((EventStoreTcp)EventStoreTcp.this).settings.requireMaster, transaction.transactionId, userCredentials));
            return result;
        }
    }

    private static enum ConnectingPhase {
        INVALID,
        RECONNECTING,
        ENDPOINT_DISCOVERY,
        CONNECTION_ESTABLISHING,
        AUTHENTICATION,
        IDENTIFICATION,
        CONNECTED;

    }

    private static enum ConnectionState {
        INIT,
        CONNECTING,
        CONNECTED,
        CLOSED;

    }
}

