/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.sink.DbMetadataQueries;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.dialect.DbDialect;
import io.confluent.connect.jdbc.sink.metadata.DbTable;
import io.confluent.connect.jdbc.sink.metadata.DbTableColumn;
import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
import io.confluent.connect.jdbc.sink.metadata.TableMetadataLoadingCache;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbStructure {
    private static final Logger log = LoggerFactory.getLogger(DbStructure.class);
    private final TableMetadataLoadingCache tableMetadataLoadingCache = new TableMetadataLoadingCache();
    private final DbDialect dbDialect;

    public DbStructure(DbDialect dbDialect) {
        this.dbDialect = dbDialect;
    }

    public boolean createOrAmendIfNecessary(JdbcSinkConfig config, Connection connection, String tableName, FieldsMetadata fieldsMetadata) throws SQLException {
        if (this.tableMetadataLoadingCache.get(connection, tableName) == null) {
            try {
                this.create(config, connection, tableName, fieldsMetadata);
            }
            catch (SQLException sqle) {
                log.warn("Create failed, will attempt amend if table already exists", (Throwable)sqle);
                if (DbMetadataQueries.doesTableExist(connection, tableName)) {
                    this.tableMetadataLoadingCache.refresh(connection, tableName);
                }
                throw sqle;
            }
        }
        return this.amendIfNecessary(config, connection, tableName, fieldsMetadata, config.maxRetries);
    }

    void create(JdbcSinkConfig config, Connection connection, String tableName, FieldsMetadata fieldsMetadata) throws SQLException {
        if (!config.autoCreate) {
            throw new ConnectException(String.format("Table %s is missing and auto-creation is disabled", tableName));
        }
        String sql = this.dbDialect.getCreateQuery(tableName, fieldsMetadata.allFields.values());
        log.info("Creating table:{} with SQL: {}", (Object)tableName, (Object)sql);
        try (Statement statement = connection.createStatement();){
            statement.executeUpdate(sql);
            connection.commit();
        }
        this.tableMetadataLoadingCache.refresh(connection, tableName);
    }

    boolean amendIfNecessary(JdbcSinkConfig config, Connection connection, String tableName, FieldsMetadata fieldsMetadata, int maxRetries) throws SQLException {
        DbTable tableMetadata = this.tableMetadataLoadingCache.get(connection, tableName);
        Map<String, DbTableColumn> dbColumns = tableMetadata.columns;
        Set<SinkRecordField> missingFields = this.missingFields(fieldsMetadata.allFields.values(), dbColumns.keySet());
        if (missingFields.isEmpty()) {
            return false;
        }
        for (SinkRecordField missingField : missingFields) {
            if (missingField.isOptional() || missingField.defaultValue() != null) continue;
            throw new ConnectException("Cannot ALTER to add missing field " + missingField + ", as it is not optional and does not have a default value");
        }
        if (!config.autoEvolve) {
            throw new ConnectException(String.format("Table %s is missing fields (%s) and auto-evolution is disabled", tableName, missingFields));
        }
        List<String> amendTableQueries = this.dbDialect.getAlterTable(tableName, missingFields);
        log.info("Amending table to add missing fields:{} maxRetries:{} with SQL: {}", new Object[]{missingFields, maxRetries, amendTableQueries});
        try (Statement statement = connection.createStatement();){
            for (String amendTableQuery : amendTableQueries) {
                statement.executeUpdate(amendTableQuery);
            }
            connection.commit();
        }
        catch (SQLException sqle) {
            if (maxRetries <= 0) {
                throw new ConnectException(String.format("Failed to amend table '%s' to add missing fields: %s", tableName, missingFields), (Throwable)sqle);
            }
            log.warn("Amend failed, re-attempting", (Throwable)sqle);
            this.tableMetadataLoadingCache.refresh(connection, tableName);
            return this.amendIfNecessary(config, connection, tableName, fieldsMetadata, maxRetries - 1);
        }
        this.tableMetadataLoadingCache.refresh(connection, tableName);
        return true;
    }

    Set<SinkRecordField> missingFields(Collection<SinkRecordField> fields, Set<String> dbColumnNames) {
        HashSet<SinkRecordField> missingFields = new HashSet<SinkRecordField>();
        for (SinkRecordField field : fields) {
            if (dbColumnNames.contains(field.name())) continue;
            missingFields.add(field);
        }
        return missingFields;
    }
}

