/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.sink.DbStructure;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata;
import io.confluent.connect.jdbc.sink.metadata.SchemaPair;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.TableId;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedRecords {
    private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class);
    private final TableId tableId;
    private final JdbcSinkConfig config;
    private final DatabaseDialect dbDialect;
    private final DbStructure dbStructure;
    private final Connection connection;
    private List<SinkRecord> records = new ArrayList<SinkRecord>();
    private SchemaPair currentSchemaPair;
    private FieldsMetadata fieldsMetadata;
    private PreparedStatement preparedStatement;
    private DatabaseDialect.StatementBinder preparedStatementBinder;

    public BufferedRecords(JdbcSinkConfig config, TableId tableId, DatabaseDialect dbDialect, DbStructure dbStructure, Connection connection) {
        this.tableId = tableId;
        this.config = config;
        this.dbDialect = dbDialect;
        this.dbStructure = dbStructure;
        this.connection = connection;
    }

    public List<SinkRecord> add(SinkRecord record) throws SQLException {
        List<Object> flushed;
        SchemaPair schemaPair = new SchemaPair(record.keySchema(), record.valueSchema());
        if (this.currentSchemaPair == null) {
            this.currentSchemaPair = schemaPair;
            this.fieldsMetadata = FieldsMetadata.extract(this.tableId.tableName(), this.config.pkMode, this.config.pkFields, this.config.fieldsWhitelist, this.currentSchemaPair);
            this.dbStructure.createOrAmendIfNecessary(this.config, this.connection, this.tableId, this.fieldsMetadata);
            String sql = this.getInsertSql();
            log.debug("{} sql: {}", (Object)this.config.insertMode, (Object)sql);
            this.close();
            this.preparedStatement = this.connection.prepareStatement(sql);
            this.preparedStatementBinder = this.dbDialect.statementBinder(this.preparedStatement, this.config.pkMode, schemaPair, this.fieldsMetadata, this.dbStructure.tableDefinition(this.connection, this.tableId), this.config.insertMode);
        }
        if (this.currentSchemaPair.equals(schemaPair)) {
            this.records.add(record);
            if (this.records.size() >= this.config.batchSize) {
                log.debug("Flushing buffered records after exceeding configured batch size {}.", (Object)this.config.batchSize);
                flushed = this.flush();
            } else {
                flushed = Collections.emptyList();
            }
        } else {
            log.debug("Flushing buffered records after due to unequal schema pairs: current schemas: {}, next schemas: {}", (Object)this.currentSchemaPair, (Object)schemaPair);
            flushed = this.flush();
            this.currentSchemaPair = null;
            flushed.addAll(this.add(record));
        }
        return flushed;
    }

    public List<SinkRecord> flush() throws SQLException {
        if (this.records.isEmpty()) {
            log.debug("Records is empty");
            return new ArrayList<SinkRecord>();
        }
        log.debug("Flushing {} buffered records", (Object)this.records.size());
        for (SinkRecord record : this.records) {
            this.preparedStatementBinder.bindRecord(record);
        }
        int totalUpdateCount = 0;
        boolean successNoInfo = false;
        for (int updateCount : this.preparedStatement.executeBatch()) {
            if (updateCount == -2) {
                successNoInfo = true;
                continue;
            }
            totalUpdateCount += updateCount;
        }
        if (totalUpdateCount != this.records.size() && !successNoInfo) {
            switch (this.config.insertMode) {
                case INSERT: {
                    throw new ConnectException(String.format("Update count (%d) did not sum up to total number of records inserted (%d)", totalUpdateCount, this.records.size()));
                }
                case UPSERT: 
                case UPDATE: {
                    log.debug("{} records:{} resulting in in totalUpdateCount:{}", new Object[]{this.config.insertMode, this.records.size(), totalUpdateCount});
                    break;
                }
                default: {
                    throw new ConnectException("Unknown insert mode: " + (Object)((Object)this.config.insertMode));
                }
            }
        }
        if (successNoInfo) {
            log.info("{} records:{} , but no count of the number of rows it affected is available", (Object)this.config.insertMode, (Object)this.records.size());
        }
        List<SinkRecord> flushedRecords = this.records;
        this.records = new ArrayList<SinkRecord>();
        return flushedRecords;
    }

    public void close() throws SQLException {
        log.info("Closing BufferedRecords with preparedStatement: {}", (Object)this.preparedStatement);
        if (this.preparedStatement != null) {
            this.preparedStatement.close();
            this.preparedStatement = null;
        }
    }

    private String getInsertSql() {
        switch (this.config.insertMode) {
            case INSERT: {
                return this.dbDialect.buildInsertStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames), this.asColumns(this.fieldsMetadata.nonKeyFieldNames));
            }
            case UPSERT: {
                if (this.fieldsMetadata.keyFieldNames.isEmpty()) {
                    throw new ConnectException(String.format("Write to table '%s' in UPSERT mode requires key field names to be known, check the primary key configuration", this.tableId));
                }
                try {
                    return this.dbDialect.buildUpsertQueryStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames), this.asColumns(this.fieldsMetadata.nonKeyFieldNames));
                }
                catch (UnsupportedOperationException e) {
                    throw new ConnectException(String.format("Write to table '%s' in UPSERT mode is not supported with the %s dialect.", this.tableId, this.dbDialect.name()));
                }
            }
            case UPDATE: {
                return this.dbDialect.buildUpdateStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames), this.asColumns(this.fieldsMetadata.nonKeyFieldNames));
            }
        }
        throw new ConnectException("Invalid insert mode");
    }

    private Collection<ColumnId> asColumns(Collection<String> names) {
        return names.stream().map(name -> new ColumnId(this.tableId, (String)name)).collect(Collectors.toList());
    }
}

