/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.util.ConnectionProvider;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TableId;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableMonitorThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(TableMonitorThread.class);
    private final DatabaseDialect dialect;
    private final ConnectionProvider connectionProvider;
    private final ConnectorContext context;
    private final CountDownLatch shutdownLatch;
    private final long startupMs;
    private final long pollMs;
    private final Set<String> whitelist;
    private final Set<String> blacklist;
    private final AtomicReference<List<TableId>> tables;
    private final Time time;

    public TableMonitorThread(DatabaseDialect dialect, ConnectionProvider connectionProvider, ConnectorContext context, long startupMs, long pollMs, Set<String> whitelist, Set<String> blacklist, Time time) {
        this.dialect = dialect;
        this.connectionProvider = connectionProvider;
        this.context = context;
        this.shutdownLatch = new CountDownLatch(1);
        this.startupMs = startupMs;
        this.pollMs = pollMs;
        this.whitelist = whitelist;
        this.blacklist = blacklist;
        this.tables = new AtomicReference();
        this.time = time;
    }

    @Override
    public void run() {
        log.info("Starting thread to monitor tables.");
        while (this.shutdownLatch.getCount() > 0L) {
            try {
                if (this.updateTables()) {
                    this.context.requestTaskReconfiguration();
                }
            }
            catch (Exception e) {
                throw this.fail(e);
            }
            try {
                log.debug("Waiting {} ms to check for changed.", (Object)this.pollMs);
                boolean shuttingDown = this.shutdownLatch.await(this.pollMs, TimeUnit.MILLISECONDS);
                if (!shuttingDown) continue;
                return;
            }
            catch (InterruptedException e) {
                log.error("Unexpected InterruptedException, ignoring: ", (Throwable)e);
            }
        }
    }

    public List<TableId> tables() {
        this.awaitTablesReady(this.startupMs);
        List<TableId> tablesSnapshot = this.tables.get();
        if (tablesSnapshot == null) {
            return null;
        }
        Map<String, List> duplicates = tablesSnapshot.stream().collect(Collectors.groupingBy(TableId::tableName)).entrySet().stream().filter(entry -> ((List)entry.getValue()).size() > 1).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (tablesSnapshot.isEmpty()) {
            log.debug("Based on the supplied filtering rules, there are no matching tables to read from");
        } else {
            log.debug("Based on the supplied filtering rules, the tables available to read from include: {}", (Object)this.dialect.expressionBuilder().appendList().delimitedBy(",").of(tablesSnapshot));
        }
        if (!duplicates.isEmpty()) {
            String configText = this.whitelist != null ? "'table.whitelist'" : (this.blacklist != null ? "'table.blacklist'" : "'table.whitelist' or 'table.blacklist'");
            String msg = "The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's " + configText + " config to include exactly one table in each of the tables listed below.\n\t";
            ConnectException exception = new ConnectException(msg + duplicates.values());
            throw this.fail((Throwable)exception);
        }
        return tablesSnapshot;
    }

    private void awaitTablesReady(long timeoutMs) {
        try {
            this.time.waitObject(this.tables, () -> this.tables.get() != null, this.time.milliseconds() + timeoutMs);
        }
        catch (InterruptedException | TimeoutException e) {
            log.warn("Timed out or interrupted while awaiting for tables being read.");
            return;
        }
    }

    public void shutdown() {
        log.info("Shutting down thread monitoring tables.");
        this.shutdownLatch.countDown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updateTables() {
        List<TableId> allTables;
        try {
            allTables = this.dialect.tableIds(this.connectionProvider.getConnection());
            log.debug("Got the following tables: {}", allTables);
        }
        catch (SQLException e) {
            log.error("Error while trying to get updated table list, ignoring and waiting for next table poll interval", (Throwable)e);
            this.connectionProvider.close();
            return false;
        }
        ArrayList<TableId> filteredTables = new ArrayList<TableId>(allTables.size());
        if (this.whitelist != null) {
            for (TableId table : allTables) {
                String fqn1 = this.dialect.expressionBuilder().append((Object)table, QuoteMethod.NEVER).toString();
                String fqn2 = this.dialect.expressionBuilder().append((Object)table, QuoteMethod.ALWAYS).toString();
                if (!this.whitelist.contains(fqn1) && !this.whitelist.contains(fqn2) && !this.whitelist.contains(table.tableName())) continue;
                filteredTables.add(table);
            }
        } else if (this.blacklist != null) {
            for (TableId table : allTables) {
                String fqn1 = this.dialect.expressionBuilder().append((Object)table, QuoteMethod.NEVER).toString();
                String fqn2 = this.dialect.expressionBuilder().append((Object)table, QuoteMethod.ALWAYS).toString();
                if (this.blacklist.contains(fqn1) || this.blacklist.contains(fqn2) || this.blacklist.contains(table.tableName())) continue;
                filteredTables.add(table);
            }
        } else {
            filteredTables.addAll(allTables);
        }
        List priorTablesSnapshot = this.tables.getAndSet(filteredTables);
        AtomicReference<List<TableId>> atomicReference = this.tables;
        synchronized (atomicReference) {
            this.tables.notifyAll();
        }
        return !Objects.equals(priorTablesSnapshot, filteredTables);
    }

    private RuntimeException fail(Throwable t) {
        String message = "Encountered an unrecoverable error while reading tables from the database";
        log.error(message, t);
        ConnectException exception = new ConnectException(message, t);
        this.context.raiseError((Exception)exception);
        this.shutdownLatch.countDown();
        return exception;
    }
}

