/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.util.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

@FunctionalInterface
public interface RecordValidator {
    public static final RecordValidator NO_OP = record -> {};

    public void validate(SinkRecord var1);

    default public RecordValidator and(RecordValidator other) {
        if (other == null || other == NO_OP || other == this) {
            return this;
        }
        if (this == NO_OP) {
            return other;
        }
        RecordValidator thisValidator = this;
        return record -> {
            thisValidator.validate(record);
            other.validate(record);
        };
    }

    public static RecordValidator create(JdbcSinkConfig config) {
        RecordValidator requiresKey = RecordValidator.requiresKey(config);
        RecordValidator requiresValue = RecordValidator.requiresValue(config);
        RecordValidator keyValidator = NO_OP;
        RecordValidator valueValidator = NO_OP;
        switch (config.pkMode) {
            case RECORD_KEY: {
                keyValidator = keyValidator.and(requiresKey);
                break;
            }
            case RECORD_VALUE: 
            case NONE: {
                valueValidator = valueValidator.and(requiresValue);
                break;
            }
        }
        if (config.deleteEnabled) {
            keyValidator = keyValidator.and(requiresKey);
        }
        return keyValidator.and(valueValidator);
    }

    public static RecordValidator requiresValue(JdbcSinkConfig config) {
        return record -> {
            Schema valueSchema = record.valueSchema();
            if (record.value() != null && valueSchema != null && valueSchema.type() == Schema.Type.STRUCT) {
                return;
            }
            throw new ConnectException(String.format("Sink connector '%s' is configured with '%s=%s' and '%s=%s' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='%s',partition=%d,offset=%d,timestamp=%d) with a %s value and %s value schema.", config.connectorName(), "delete.enabled", config.deleteEnabled, "pk.mode", config.pkMode.toString().toLowerCase(), record.topic(), record.kafkaPartition(), record.kafkaOffset(), record.timestamp(), StringUtils.valueTypeOrNull(record.value()), StringUtils.schemaTypeOrNull(record.valueSchema())));
        };
    }

    public static RecordValidator requiresKey(JdbcSinkConfig config) {
        return record -> {
            Schema keySchema = record.keySchema();
            if (record.key() != null && keySchema != null && (keySchema.type() == Schema.Type.STRUCT || keySchema.type().isPrimitive())) {
                return;
            }
            throw new ConnectException(String.format("Sink connector '%s' is configured with '%s=%s' and '%s=%s' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='%s',partition=%d,offset=%d,timestamp=%d) with a %s key and %s key schema.", config.connectorName(), "delete.enabled", config.deleteEnabled, "pk.mode", config.pkMode.toString().toLowerCase(), record.topic(), record.kafkaPartition(), record.kafkaOffset(), record.timestamp(), StringUtils.valueTypeOrNull(record.key()), StringUtils.schemaTypeOrNull(record.keySchema())));
        };
    }
}

