/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.source.BulkTableQuerier;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig;
import io.confluent.connect.jdbc.source.OffsetProtocols;
import io.confluent.connect.jdbc.source.TableQuerier;
import io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
    private Time time;
    private JdbcSourceTaskConfig config;
    private DatabaseDialect dialect;
    private CachedConnectionProvider cachedConnectionProvider;
    private PriorityQueue<TableQuerier> tableQueue = new PriorityQueue();
    private final AtomicBoolean running = new AtomicBoolean(false);

    public JdbcSourceTask() {
        this.time = new SystemTime();
    }

    public JdbcSourceTask(Time time) {
        this.time = time;
    }

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        log.info("Starting JDBC source task");
        try {
            this.config = new JdbcSourceTaskConfig(properties);
        }
        catch (ConfigException e) {
            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", (Throwable)e);
        }
        String url = this.config.getString("connection.url");
        int maxConnAttempts = this.config.getInt("connection.attempts");
        long retryBackoff = this.config.getLong("connection.backoff.ms");
        String dialectName = this.config.getString("dialect.name");
        this.dialect = dialectName != null && !dialectName.trim().isEmpty() ? DatabaseDialects.create(dialectName, this.config) : DatabaseDialects.findBestFor(url, this.config);
        log.info("Using JDBC dialect {}", (Object)this.dialect.name());
        this.cachedConnectionProvider = this.connectionProvider(maxConnAttempts, retryBackoff);
        List<String> tables = this.config.getList("tables");
        String query = this.config.getString("query");
        if (tables.isEmpty() && query.isEmpty() || !tables.isEmpty() && !query.isEmpty()) {
            throw new ConnectException("Invalid configuration: each JdbcSourceTask must have at least one table assigned to it or one query specified");
        }
        TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE;
        List<String> tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(query) : tables;
        String mode = this.config.getString("mode");
        HashMap<String, List<Map<String, String>>> partitionsByTableFqn = new HashMap<String, List<Map<String, String>>>();
        Map offsets = null;
        if (mode.equals("incrementing") || mode.equals("timestamp") || mode.equals("timestamp+incrementing")) {
            ArrayList<Map<String, String>> partitions = new ArrayList<Map<String, String>>(tables.size());
            switch (queryMode) {
                case TABLE: {
                    log.trace("Starting in TABLE mode");
                    for (String table : tables) {
                        List<Map<String, String>> tablePartitions = this.possibleTablePartitions(table);
                        partitions.addAll(tablePartitions);
                        partitionsByTableFqn.put(table, tablePartitions);
                    }
                    break;
                }
                case QUERY: {
                    log.trace("Starting in QUERY mode");
                    partitions.add(Collections.singletonMap("query", "query"));
                    break;
                }
                default: {
                    throw new ConnectException("Unknown query mode: " + (Object)((Object)queryMode));
                }
            }
            offsets = this.context.offsetStorageReader().offsets(partitions);
            log.trace("The partition offsets are {}", (Object)offsets);
        }
        String incrementingColumn = this.config.getString("incrementing.column.name");
        List timestampColumns = this.config.getList("timestamp.column.name");
        Long timestampDelayInterval = this.config.getLong("timestamp.delay.interval.ms");
        boolean validateNonNulls = this.config.getBoolean("validate.non.null");
        TimeZone timeZone = this.config.timeZone();
        String suffix = this.config.getString("query.suffix").trim();
        for (String tableOrQuery : tablesOrQuery) {
            List<Map<String, String>> tablePartitionsToCheck;
            switch (queryMode) {
                case TABLE: {
                    if (validateNonNulls) {
                        this.validateNonNullable(mode, tableOrQuery, incrementingColumn, timestampColumns);
                    }
                    tablePartitionsToCheck = (List<Map<String, String>>)partitionsByTableFqn.get(tableOrQuery);
                    break;
                }
                case QUERY: {
                    Map<String, String> partition = Collections.singletonMap("query", "query");
                    tablePartitionsToCheck = Collections.singletonList(partition);
                    break;
                }
                default: {
                    throw new ConnectException("Unexpected query mode: " + (Object)((Object)queryMode));
                }
            }
            Map offset = null;
            if (offsets != null) {
                for (Map<String, String> toCheckPartition : tablePartitionsToCheck) {
                    offset = (Map)offsets.get(toCheckPartition);
                    if (offset == null) continue;
                    log.info("Found offset {} for partition {}", (Object)offsets, toCheckPartition);
                    break;
                }
            }
            offset = this.computeInitialOffset(tableOrQuery, offset, timeZone);
            String topicPrefix = this.config.getString("topic.prefix");
            if (mode.equals("bulk")) {
                this.tableQueue.add(new BulkTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, suffix));
                continue;
            }
            if (mode.equals("incrementing")) {
                this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, null, incrementingColumn, offset, timestampDelayInterval, timeZone, suffix));
                continue;
            }
            if (mode.equals("timestamp")) {
                this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, timestampColumns, null, offset, timestampDelayInterval, timeZone, suffix));
                continue;
            }
            if (!mode.endsWith("timestamp+incrementing")) continue;
            this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, timestampColumns, incrementingColumn, offset, timestampDelayInterval, timeZone, suffix));
        }
        this.running.set(true);
        log.info("Started JDBC source task");
    }

    protected CachedConnectionProvider connectionProvider(int maxConnAttempts, long retryBackoff) {
        return new CachedConnectionProvider(this.dialect, maxConnAttempts, retryBackoff){

            @Override
            protected void onConnect(Connection connection) throws SQLException {
                super.onConnect(connection);
                connection.setAutoCommit(false);
            }
        };
    }

    private List<Map<String, String>> possibleTablePartitions(String table) {
        TableId tableId = this.dialect.parseTableIdentifier(table);
        return Arrays.asList(OffsetProtocols.sourcePartitionForProtocolV1(tableId), OffsetProtocols.sourcePartitionForProtocolV0(tableId));
    }

    protected Map<String, Object> computeInitialOffset(String tableOrQuery, Map<String, Object> partitionOffset, TimeZone timezone) {
        if (partitionOffset != null) {
            return partitionOffset;
        }
        HashMap<String, Long> initialPartitionOffset = null;
        Long timestampInitial = this.config.getLong("timestamp.initial");
        if (timestampInitial != null) {
            if (timestampInitial == JdbcSourceConnectorConfig.TIMESTAMP_INITIAL_CURRENT) {
                try {
                    Connection con = this.cachedConnectionProvider.getConnection();
                    Calendar cal = Calendar.getInstance(timezone);
                    timestampInitial = this.dialect.currentTimeOnDB(con, cal).getTime();
                }
                catch (SQLException e) {
                    throw new ConnectException("Error while getting initial timestamp from database", (Throwable)e);
                }
            }
            initialPartitionOffset = new HashMap<String, Long>();
            initialPartitionOffset.put("timestamp", timestampInitial);
            log.info("No offsets found for '{}', so using configured timestamp {}", (Object)tableOrQuery, (Object)timestampInitial);
        }
        return initialPartitionOffset;
    }

    public void stop() throws ConnectException {
        log.info("Stopping JDBC source task");
        this.running.set(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeResources() {
        log.info("Closing resources for JDBC source task");
        try {
            if (this.cachedConnectionProvider != null) {
                this.cachedConnectionProvider.close();
            }
        }
        catch (Throwable t) {
            log.warn("Error while closing the connections", t);
        }
        finally {
            this.cachedConnectionProvider = null;
            try {
                if (this.dialect != null) {
                    this.dialect.close();
                }
            }
            catch (Throwable t) {
                log.warn("Error while closing the {} dialect: ", (Object)this.dialect.name(), (Object)t);
            }
            finally {
                this.dialect = null;
            }
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        TableQuerier querier;
        log.trace("{} Polling for new data");
        while (this.running.get()) {
            long now;
            long nextUpdate;
            long sleepMs;
            querier = this.tableQueue.peek();
            if (!querier.querying() && (sleepMs = Math.min((nextUpdate = querier.getLastUpdate() + (long)this.config.getInt("poll.interval.ms").intValue()) - (now = this.time.milliseconds()), 100L)) > 0L) {
                log.trace("Waiting {} ms to poll {} next", (Object)(nextUpdate - now), (Object)querier.toString());
                this.time.sleep(sleepMs);
                continue;
            }
            ArrayList<SourceRecord> results = new ArrayList<SourceRecord>();
            try {
                log.debug("Checking for next block of results from {}", (Object)querier.toString());
                querier.maybeStartQuery(this.cachedConnectionProvider.getConnection());
                int batchMaxRows = this.config.getInt("batch.max.rows");
                boolean hadNext = true;
                while (results.size() < batchMaxRows && (hadNext = querier.next())) {
                    results.add(querier.extractRecord());
                }
                if (!hadNext) {
                    this.resetAndRequeueHead(querier);
                }
                if (results.isEmpty()) {
                    log.trace("No updates for {}", (Object)querier.toString());
                    continue;
                }
                log.debug("Returning {} records for {}", (Object)results.size(), (Object)querier.toString());
                return results;
            }
            catch (SQLException sqle) {
                log.error("Failed to run query for table {}: {}", (Object)querier.toString(), (Object)sqle);
                this.resetAndRequeueHead(querier);
                return null;
            }
            catch (Throwable t) {
                this.resetAndRequeueHead(querier);
                this.closeResources();
                throw t;
            }
        }
        querier = this.tableQueue.peek();
        if (querier != null) {
            this.resetAndRequeueHead(querier);
        }
        this.closeResources();
        return null;
    }

    private void resetAndRequeueHead(TableQuerier expectedHead) {
        log.debug("Resetting querier {}", (Object)expectedHead.toString());
        TableQuerier removedQuerier = this.tableQueue.poll();
        assert (removedQuerier == expectedHead);
        expectedHead.reset(this.time.milliseconds());
        this.tableQueue.add(expectedHead);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validateNonNullable(String incrementalMode, String table, String incrementingColumn, List<String> timestampColumns) {
        try {
            HashSet<String> lowercaseTsColumns = new HashSet<String>();
            for (String timestampColumn : timestampColumns) {
                lowercaseTsColumns.add(timestampColumn.toLowerCase(Locale.getDefault()));
            }
            boolean incrementingOptional = false;
            boolean atLeastOneTimestampNotOptional = false;
            Connection conn = this.cachedConnectionProvider.getConnection();
            boolean autoCommit = conn.getAutoCommit();
            try {
                conn.setAutoCommit(true);
                Map<ColumnId, ColumnDefinition> defnsById = this.dialect.describeColumns(conn, table, null);
                for (ColumnDefinition defn : defnsById.values()) {
                    String columnName = defn.id().name();
                    if (columnName.equalsIgnoreCase(incrementingColumn)) {
                        incrementingOptional = defn.isOptional();
                        continue;
                    }
                    if (!lowercaseTsColumns.contains(columnName.toLowerCase(Locale.getDefault())) || defn.isOptional()) continue;
                    atLeastOneTimestampNotOptional = true;
                }
            }
            finally {
                conn.setAutoCommit(autoCommit);
            }
            if ((incrementalMode.equals("incrementing") || incrementalMode.equals("timestamp+incrementing")) && incrementingOptional) {
                throw new ConnectException("Cannot make incremental queries using incrementing column " + incrementingColumn + " on " + table + " because this column is nullable.");
            }
            if ((incrementalMode.equals("timestamp") || incrementalMode.equals("timestamp+incrementing")) && !atLeastOneTimestampNotOptional) {
                throw new ConnectException("Cannot make incremental queries using timestamp columns " + timestampColumns + " on " + table + " because all of these columns nullable.");
            }
        }
        catch (SQLException e) {
            throw new ConnectException("Failed trying to validate that columns used for offsets are NOT NULL", (Throwable)e);
        }
    }
}

