package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.class */
public class SystemTopicTxnBufferSnapshotService<T> {
    private static final Logger log = LoggerFactory.getLogger(SystemTopicTxnBufferSnapshotService.class);
    protected final ConcurrentHashMap<NamespaceName, SystemTopicClient<T>> clients;
    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
    protected final Class<T> schemaType;
    protected final EventType systemTopicType;
    private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> refCountedWriterMap;
    private final TableView<T> tableView;

    /* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService$ReferenceCountedWriter.class */
    public static class ReferenceCountedWriter<T> {
        private final AtomicLong referenceCount = new AtomicLong(1);
        private final NamespaceName namespaceName;
        private final CompletableFuture<SystemTopicClient.Writer<T>> future;
        private final SystemTopicTxnBufferSnapshotService<T> snapshotService;

        public ReferenceCountedWriter(NamespaceName namespaceName, CompletableFuture<SystemTopicClient.Writer<T>> completableFuture, SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService) {
            this.namespaceName = namespaceName;
            this.snapshotService = systemTopicTxnBufferSnapshotService;
            this.future = completableFuture;
            this.future.exceptionally(th -> {
                SystemTopicTxnBufferSnapshotService.log.error("[{}] Failed to create TB snapshot writer.", namespaceName, th);
                systemTopicTxnBufferSnapshotService.refCountedWriterMap.remove(namespaceName, this);
                return null;
            });
        }

        public CompletableFuture<SystemTopicClient.Writer<T>> getFuture() {
            return this.future;
        }

        private synchronized boolean retain() {
            return this.referenceCount.incrementAndGet() > 0;
        }

        public synchronized void release() {
            if (this.referenceCount.decrementAndGet() == 0) {
                ((SystemTopicTxnBufferSnapshotService) this.snapshotService).refCountedWriterMap.remove(this.namespaceName, this);
                this.future.thenAccept(writer -> {
                    String topicName = writer.getSystemTopicClient().getTopicName().toString();
                    writer.closeAsync().exceptionally(th -> {
                        if (th != null) {
                            SystemTopicTxnBufferSnapshotService.log.error("[{}] Failed to close TB snapshot writer.", topicName, th);
                            return null;
                        }
                        if (!SystemTopicTxnBufferSnapshotService.log.isDebugEnabled()) {
                            return null;
                        }
                        SystemTopicTxnBufferSnapshotService.log.debug("[{}] Success to close TB snapshot writer.", topicName);
                        return null;
                    });
                });
            }
        }
    }

    public SystemTopicTxnBufferSnapshotService(PulsarService pulsarService, EventType eventType, Class<T> cls) throws PulsarServerException {
        PulsarClientImpl client = pulsarService.getClient();
        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
        this.systemTopicType = eventType;
        this.schemaType = cls;
        this.clients = new ConcurrentHashMap<>();
        this.refCountedWriterMap = new ConcurrentHashMap<>();
        this.tableView = new TableView<>(this::createReader, client.getConfiguration().getOperationTimeoutMs(), pulsarService.getExecutor());
    }

    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
        return getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync();
    }

    public void removeClient(TopicName topicName, SystemTopicClientBase<T> systemTopicClientBase) {
        if (systemTopicClientBase.getReaders().size() == 0 && systemTopicClientBase.getWriters().size() == 0) {
            this.clients.remove(topicName.getNamespaceObject());
        }
    }

    public ReferenceCountedWriter<T> getReferenceWriter(NamespaceName namespaceName) {
        return this.refCountedWriterMap.compute(namespaceName, (namespaceName2, referenceCountedWriter) -> {
            return (referenceCountedWriter == null || !referenceCountedWriter.retain()) ? new ReferenceCountedWriter(namespaceName, getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(), this) : referenceCountedWriter;
        });
    }

    private SystemTopicClient<T> getTransactionBufferSystemTopicClient(NamespaceName namespaceName) {
        TopicName systemTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName(namespaceName, this.systemTopicType);
        if (systemTopicName == null) {
            throw new RuntimeException((Throwable) new PulsarClientException.InvalidTopicNameException("Can't get the TB system topic client for namespace " + String.valueOf(namespaceName) + " with type " + String.valueOf(this.systemTopicType) + "."));
        }
        return this.clients.computeIfAbsent(namespaceName, namespaceName2 -> {
            return this.namespaceEventsSystemTopicFactory.createTransactionBufferSystemTopicClient(systemTopicName, this, this.schemaType);
        });
    }

    public void close() throws Exception {
        for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry : this.clients.entrySet()) {
            try {
                entry.getValue().close();
            } catch (Exception e) {
                log.error("Failed to close system topic client for namespace {}", entry.getKey(), e);
            }
        }
        this.clients.clear();
        for (Map.Entry<NamespaceName, ReferenceCountedWriter<T>> entry2 : this.refCountedWriterMap.entrySet()) {
            CompletableFuture<SystemTopicClient.Writer<T>> future = entry2.getValue().getFuture();
            if (!future.isCompletedExceptionally()) {
                future.thenAccept(writer -> {
                    try {
                        writer.close();
                    } catch (Exception e2) {
                        log.error("Failed to close writer for namespace {}", entry2.getKey(), e2);
                    }
                });
            }
        }
        this.refCountedWriterMap.clear();
    }

    public TableView<T> getTableView() {
        return this.tableView;
    }
}
