package io.confluent.ksql.physical.scalablepush;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.scalablepush.locator.AllHostsLocator;
import io.confluent.ksql.physical.scalablepush.locator.PushLocator;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.class */
public class ScalablePushRegistry implements ProcessorSupplier<Object, GenericRow, Void, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(ScalablePushRegistry.class);
    private final PushLocator pushLocator;
    private final LogicalSchema logicalSchema;
    private final boolean isTable;
    private final boolean windowed;
    private final ConcurrentHashMap<QueryId, ProcessingQueue> processingQueues = new ConcurrentHashMap<>();
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/ScalablePushRegistry$PeekProcessor.class */
    public final class PeekProcessor implements Processor<Object, GenericRow, Void, Void> {
        private PeekProcessor() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public void process(Record<Object, GenericRow> record) {
            ScalablePushRegistry.this.handleRow(record);
        }

        public void close() {
        }
    }

    public ScalablePushRegistry(PushLocator pushLocator, LogicalSchema logicalSchema, boolean z, boolean z2) {
        this.pushLocator = pushLocator;
        this.logicalSchema = logicalSchema;
        this.isTable = z;
        this.windowed = z2;
    }

    public synchronized void close() {
        Iterator<ProcessingQueue> it = this.processingQueues.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.processingQueues.clear();
        this.closed = true;
    }

    public synchronized void register(ProcessingQueue processingQueue) {
        if (this.closed) {
            throw new IllegalStateException("Shouldn't register after closing");
        }
        this.processingQueues.put(processingQueue.getQueryId(), processingQueue);
    }

    public synchronized void unregister(ProcessingQueue processingQueue) {
        if (this.closed) {
            throw new IllegalStateException("Shouldn't unregister after closing");
        }
        this.processingQueues.remove(processingQueue.getQueryId());
    }

    public PushLocator getLocator() {
        return this.pushLocator;
    }

    public boolean isTable() {
        return this.isTable;
    }

    public boolean isWindowed() {
        return this.windowed;
    }

    @VisibleForTesting
    int numRegistered() {
        return this.processingQueues.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRow(Record<Object, GenericRow> record) {
        Row of;
        Object key = record.key();
        GenericRow genericRow = (GenericRow) record.value();
        if ((key != null || this.logicalSchema.key().isEmpty()) && genericRow != null) {
            for (ProcessingQueue processingQueue : this.processingQueues.values()) {
                long timestamp = record.timestamp();
                try {
                    if (this.windowed) {
                        Windowed windowed = (Windowed) key;
                        of = WindowedRow.of(this.logicalSchema, new Windowed(GenericKey.fromList(((GenericKey) windowed.key()).values()), windowed.window()), GenericRow.fromList(genericRow.values()), timestamp);
                    } else {
                        of = Row.of(this.logicalSchema, GenericKey.fromList(key != null ? ((GenericKey) key).values() : Collections.emptyList()), GenericRow.fromList(genericRow.values()), timestamp);
                    }
                    processingQueue.offer(of);
                } catch (Throwable th) {
                    LOG.error("Error while offering row", th);
                }
            }
        }
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public Processor<Object, GenericRow, Void, Void> m212get() {
        return new PeekProcessor();
    }

    public static Optional<ScalablePushRegistry> create(LogicalSchema logicalSchema, Supplier<List<PersistentQueryMetadata>> supplier, boolean z, boolean z2, Map<String, Object> map) {
        Object obj = map.get("application.server");
        if (obj == null) {
            return Optional.empty();
        }
        if (!(obj instanceof String)) {
            throw new IllegalArgumentException("application.server not String");
        }
        try {
            return Optional.of(new ScalablePushRegistry(new AllHostsLocator(supplier, new URL((String) obj)), logicalSchema, z, z2));
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("application.server malformed: '" + obj + "'");
        }
    }
}
