package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.client.Binding;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
import io.r2dbc.postgresql.client.QueryLogger;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BindComplete;
import io.r2dbc.postgresql.message.backend.CloseComplete;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.NoData;
import io.r2dbc.postgresql.message.backend.ParseComplete;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.Bind;
import io.r2dbc.postgresql.message.frontend.Close;
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
import io.r2dbc.postgresql.message.frontend.Describe;
import io.r2dbc.postgresql.message.frontend.Execute;
import io.r2dbc.postgresql.message.frontend.ExecutionType;
import io.r2dbc.postgresql.message.frontend.Flush;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Parse;
import io.r2dbc.postgresql.message.frontend.Sync;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.postgresql.util.PredicateUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SynchronousSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/r2dbc/postgresql/ExtendedFlowDelegate.class */
public class ExtendedFlowDelegate {
    static final Predicate<BackendMessage> RESULT_FRAME_FILTER;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/postgresql/ExtendedFlowDelegate$ExtendedFlowOperator.class */
    public static class ExtendedFlowOperator extends AtomicInteger implements Predicate<BackendMessage> {
        private final String sql;
        private final Binding binding;

        @Nullable
        private volatile String name;
        private final StatementCache cache;
        private final List<ByteBuf> values;
        private final String portal;
        private final boolean forceBinary;

        public ExtendedFlowOperator(String str, Binding binding, StatementCache statementCache, List<ByteBuf> list, String str2, boolean z) {
            this.sql = str;
            this.binding = binding;
            this.cache = statementCache;
            this.values = list;
            this.portal = str2;
            this.forceBinary = z;
        }

        public void close(FluxSink<FrontendMessage> fluxSink) {
            fluxSink.complete();
            this.values.forEach((v0) -> {
                ReferenceCountUtil.release(v0);
            });
        }

        public void evictCachedStatement() {
            synchronized (this) {
                this.name = null;
            }
            this.cache.evict(this.sql);
        }

        public void hydrateStatementCache() {
            this.cache.put(this.binding, this.sql, getStatementName());
        }

        public Predicate<BackendMessage> takeUntil() {
            return this;
        }

        @Override // java.util.function.Predicate
        public boolean test(BackendMessage backendMessage) {
            return (backendMessage instanceof ReadyForQuery) && decrementAndGet() <= 0;
        }

        private boolean isPrepareRequired() {
            return this.cache.requiresPrepare(this.binding, this.sql);
        }

        public String getStatementName() {
            String str;
            synchronized (this) {
                if (this.name == null) {
                    this.name = this.cache.getName(this.binding, this.sql);
                }
                str = this.name;
            }
            return str;
        }

        public List<FrontendMessage.DirectEncoder> getMessages(Collection<FrontendMessage.DirectEncoder> collection) {
            incrementAndGet();
            ArrayList arrayList = new ArrayList(6);
            if (isPrepareRequired()) {
                arrayList.add(new Parse(getStatementName(), this.binding.getParameterTypes(), this.sql));
            }
            for (ByteBuf byteBuf : this.values) {
                byteBuf.readerIndex(0);
                byteBuf.touch("ExtendedFlowOperator").retain();
            }
            arrayList.add(new Bind(this.portal, this.binding.getParameterFormats(), this.values, ExtendedQueryMessageFlow.resultFormat(this.forceBinary), getStatementName()));
            arrayList.add(new Describe(this.portal, ExecutionType.PORTAL));
            arrayList.addAll(collection);
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/postgresql/ExtendedFlowDelegate$MessageFactory.class */
    public interface MessageFactory {
        List<FrontendMessage.DirectEncoder> createMessages();
    }

    ExtendedFlowDelegate() {
    }

    public static Flux<BackendMessage> runQuery(ConnectionResources connectionResources, ExceptionFactory exceptionFactory, String str, Binding binding, List<ByteBuf> list, int i) {
        StatementCache statementCache = connectionResources.getStatementCache();
        Client client = connectionResources.getClient();
        String str2 = connectionResources.getPortalNameSupplier().get();
        boolean isCompatibilityMode = connectionResources.getConfiguration().isCompatibilityMode();
        boolean z = connectionResources.getClient().getTransactionStatus() == TransactionStatus.IDLE;
        ExtendedFlowOperator extendedFlowOperator = new ExtendedFlowOperator(str, binding, statementCache, list, str2, connectionResources.getConfiguration().isForceBinary());
        Flux filter = (isCompatibilityMode ? (i == 0 || z) ? fetchAll(extendedFlowOperator, client, str2) : fetchCursoredWithSync(extendedFlowOperator, client, str2, i) : i == 0 ? fetchAll(extendedFlowOperator, client, str2) : fetchCursoredWithFlush(extendedFlowOperator, client, str2, i)).doOnNext(backendMessage -> {
            if (backendMessage == ParseComplete.INSTANCE) {
                extendedFlowOperator.hydrateStatementCache();
            }
        }).doOnSubscribe(subscription -> {
            QueryLogger.logQuery(client.getContext(), str);
        }).doOnDiscard(ReferenceCounted.class, (v0) -> {
            ReferenceCountUtil.release(v0);
        }).filter(RESULT_FRAME_FILTER);
        exceptionFactory.getClass();
        return filter.handle(exceptionFactory::handleErrorResponse);
    }

    private static Flux<BackendMessage> fetchAll(ExtendedFlowOperator extendedFlowOperator, Client client, String str) {
        UnicastProcessor create = UnicastProcessor.create((Queue) Queues.small().get());
        FluxSink sink = create.sink();
        MessageFactory messageFactory = () -> {
            return extendedFlowOperator.getMessages(Arrays.asList(new Execute(str, 0), new Close(str, ExecutionType.PORTAL), Sync.INSTANCE));
        };
        return (Flux) client.exchange(extendedFlowOperator.takeUntil(), Flux.just(new CompositeFrontendMessage(messageFactory.createMessages())).concatWith(create)).handle(handleReprepare(sink, extendedFlowOperator, messageFactory)).doFinally(signalType -> {
            extendedFlowOperator.close(sink);
        }).as(Operators::discardOnCancel);
    }

    private static Flux<BackendMessage> fetchCursoredWithSync(ExtendedFlowOperator extendedFlowOperator, Client client, String str, int i) {
        UnicastProcessor create = UnicastProcessor.create((Queue) Queues.small().get());
        FluxSink sink = create.sink();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        MessageFactory messageFactory = () -> {
            return extendedFlowOperator.getMessages(Arrays.asList(new Execute(str, i), Sync.INSTANCE));
        };
        Predicate<BackendMessage> takeUntil = extendedFlowOperator.takeUntil();
        return (Flux) client.exchange(backendMessage -> {
            return atomicBoolean2.get() && takeUntil.test(backendMessage);
        }, Flux.just(new CompositeFrontendMessage(messageFactory.createMessages())).concatWith(create)).handle(handleReprepare(sink, extendedFlowOperator, messageFactory)).handle((backendMessage2, synchronousSink) -> {
            if (backendMessage2 instanceof CommandComplete) {
                sink.next(new Close(str, ExecutionType.PORTAL));
                sink.next(Sync.INSTANCE);
                sink.complete();
                synchronousSink.next(backendMessage2);
                return;
            }
            if (backendMessage2 instanceof CloseComplete) {
                sink.complete();
                atomicBoolean2.set(true);
                synchronousSink.next(backendMessage2);
                return;
            }
            if (backendMessage2 instanceof ErrorResponse) {
                atomicBoolean2.set(true);
                sink.next(Sync.INSTANCE);
                sink.complete();
                synchronousSink.next(backendMessage2);
                return;
            }
            if (backendMessage2 instanceof PortalSuspended) {
                if (!atomicBoolean.get()) {
                    sink.next(new Execute(str, i));
                    sink.next(Sync.INSTANCE);
                    return;
                } else {
                    sink.next(new Close(str, ExecutionType.PORTAL));
                    sink.next(Sync.INSTANCE);
                    sink.complete();
                    return;
                }
            }
            if (!(backendMessage2 instanceof NoData)) {
                synchronousSink.next(backendMessage2);
            } else {
                if (!atomicBoolean.get()) {
                    atomicBoolean2.set(true);
                    return;
                }
                sink.next(new Close(str, ExecutionType.PORTAL));
                sink.next(Sync.INSTANCE);
                sink.complete();
            }
        }).doFinally(signalType -> {
            extendedFlowOperator.close(sink);
        }).as(flux -> {
            return Operators.discardOnCancel(flux, () -> {
                atomicBoolean.set(true);
            });
        });
    }

    private static Flux<BackendMessage> fetchCursoredWithFlush(ExtendedFlowOperator extendedFlowOperator, Client client, String str, int i) {
        UnicastProcessor create = UnicastProcessor.create((Queue) Queues.small().get());
        FluxSink sink = create.sink();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MessageFactory messageFactory = () -> {
            return extendedFlowOperator.getMessages(Arrays.asList(new Execute(str, i), Flush.INSTANCE));
        };
        return (Flux) client.exchange(extendedFlowOperator.takeUntil(), Flux.just(new CompositeFrontendMessage(messageFactory.createMessages())).concatWith(create)).handle(handleReprepare(sink, extendedFlowOperator, messageFactory)).handle((backendMessage, synchronousSink) -> {
            if (backendMessage instanceof CommandComplete) {
                sink.next(new Close(str, ExecutionType.PORTAL));
                sink.next(Sync.INSTANCE);
                sink.complete();
                synchronousSink.next(backendMessage);
                return;
            }
            if (backendMessage instanceof ErrorResponse) {
                sink.next(Sync.INSTANCE);
                sink.complete();
                synchronousSink.next(backendMessage);
            } else {
                if (!(backendMessage instanceof PortalSuspended)) {
                    synchronousSink.next(backendMessage);
                    return;
                }
                if (!atomicBoolean.get()) {
                    sink.next(new Execute(str, i));
                    sink.next(Flush.INSTANCE);
                } else {
                    sink.next(new Close(str, ExecutionType.PORTAL));
                    sink.next(Sync.INSTANCE);
                    sink.complete();
                }
            }
        }).doFinally(signalType -> {
            extendedFlowOperator.close(sink);
        }).as(flux -> {
            return Operators.discardOnCancel(flux, () -> {
                atomicBoolean.set(true);
            });
        });
    }

    private static BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleReprepare(FluxSink<FrontendMessage> fluxSink, ExtendedFlowOperator extendedFlowOperator, MessageFactory messageFactory) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return (backendMessage, synchronousSink) -> {
            if ((backendMessage instanceof ErrorResponse) && requiresReprepare((ErrorResponse) backendMessage)) {
                extendedFlowOperator.evictCachedStatement();
                if (atomicBoolean.compareAndSet(false, true)) {
                    List<FrontendMessage.DirectEncoder> createMessages = messageFactory.createMessages();
                    if (!createMessages.contains(Sync.INSTANCE)) {
                        createMessages.add(0, Sync.INSTANCE);
                    }
                    fluxSink.next(new CompositeFrontendMessage(createMessages));
                    return;
                }
            }
            synchronousSink.next(backendMessage);
        };
    }

    private static boolean requiresReprepare(ErrorResponse errorResponse) {
        ErrorDetails errorDetails = new ErrorDetails(errorResponse.getFields());
        String code = errorDetails.getCode();
        if ("26000".equals(code)) {
            return true;
        }
        if (!"0A000".equals(code)) {
            return false;
        }
        String orElse = errorDetails.getRoutine().orElse(null);
        return "RevalidateCachedQuery".equals(orElse) || "RevalidateCachedPlan".equals(orElse);
    }

    static {
        Class<BindComplete> cls = BindComplete.class;
        BindComplete.class.getClass();
        Class<NoData> cls2 = NoData.class;
        NoData.class.getClass();
        RESULT_FRAME_FILTER = PredicateUtils.not(PredicateUtils.or((v1) -> {
            return r3.isInstance(v1);
        }, (v1) -> {
            return r3.isInstance(v1);
        }));
    }
}
