/*
 * Decompiled with CFR 0.152.
 */
package name.nkonev.r2dbc.migrate.core;

import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.ValidationDepth;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.stream.Collectors;
import name.nkonev.r2dbc.migrate.core.BunchOfResourcesEntry;
import name.nkonev.r2dbc.migrate.core.Dialect;
import name.nkonev.r2dbc.migrate.core.FileReader;
import name.nkonev.r2dbc.migrate.core.H2Queries;
import name.nkonev.r2dbc.migrate.core.H2TableLocker;
import name.nkonev.r2dbc.migrate.core.Locker;
import name.nkonev.r2dbc.migrate.core.MSSqlQueries;
import name.nkonev.r2dbc.migrate.core.MSSqlTableLocker;
import name.nkonev.r2dbc.migrate.core.MariadbQueries;
import name.nkonev.r2dbc.migrate.core.MariadbSessionLocker;
import name.nkonev.r2dbc.migrate.core.MariadbTableLocker;
import name.nkonev.r2dbc.migrate.core.MigrationMetadata;
import name.nkonev.r2dbc.migrate.core.MigrationMetadataFactory;
import name.nkonev.r2dbc.migrate.core.MySqlQueries;
import name.nkonev.r2dbc.migrate.core.MySqlSessionLocker;
import name.nkonev.r2dbc.migrate.core.MySqlTableLocker;
import name.nkonev.r2dbc.migrate.core.PostgreSqlAdvisoryLocker;
import name.nkonev.r2dbc.migrate.core.PostgreSqlQueries;
import name.nkonev.r2dbc.migrate.core.PostgreSqlTableLocker;
import name.nkonev.r2dbc.migrate.core.R2dbcMigrateProperties;
import name.nkonev.r2dbc.migrate.core.SqlQueries;
import name.nkonev.r2dbc.migrate.reader.MigrateResource;
import name.nkonev.r2dbc.migrate.reader.MigrateResourceReader;
import org.apache.commons.text.StringSubstitutor;
import org.apache.commons.text.lookup.StringLookupFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

public abstract class R2dbcMigrate {
    private static final Logger LOGGER = Loggers.getLogger(R2dbcMigrate.class);
    private static final String ROWS_UPDATED = "By '{}' {} rows updated";

    private static Dialect getSqlDialect(R2dbcMigrateProperties properties, Connection connection) {
        if (properties.getDialect() == null) {
            Optional<String> maybeDb = Optional.ofNullable(connection.getMetadata()).map(md -> md.getDatabaseProductName()).map(s -> s.toLowerCase());
            if (maybeDb.isPresent()) {
                if (maybeDb.get().contains("postgres")) {
                    return Dialect.POSTGRESQL;
                }
                if (maybeDb.get().contains("microsoft")) {
                    return Dialect.MSSQL;
                }
                if (maybeDb.get().contains("mysql")) {
                    return Dialect.MYSQL;
                }
                if (maybeDb.get().contains("h2")) {
                    return Dialect.H2;
                }
                if (maybeDb.get().contains("maria")) {
                    return Dialect.MARIADB;
                }
            }
            throw new RuntimeException("Cannot recognize dialect. Try to set it explicitly.");
        }
        switch (properties.getDialect()) {
            case POSTGRESQL: {
                return Dialect.POSTGRESQL;
            }
            case MSSQL: {
                return Dialect.MSSQL;
            }
            case MYSQL: {
                return Dialect.MYSQL;
            }
            case H2: {
                return Dialect.H2;
            }
            case MARIADB: {
                return Dialect.MARIADB;
            }
        }
        throw new RuntimeException("Unsupported dialect: " + properties.getDialect());
    }

    private static <T> Flux<T> withAutoCommit(Connection connection, Publisher<T> action) {
        return Mono.from((Publisher)connection.setAutoCommit(true)).thenMany(action);
    }

    private static Mono<Void> transactionalWrap(Connection connection, boolean transactional, Publisher<? extends Result> migrationThings, String info) {
        Mono migrationResult = Flux.from(migrationThings).flatMap(Result::getRowsUpdated).switchIfEmpty((Publisher)Mono.just((Object)0L)).reduceWith(() -> 0L, Long::sum).doOnSuccess(aLong -> LOGGER.info(ROWS_UPDATED, new Object[]{info, aLong}));
        Mono result = transactional ? Mono.from((Publisher)connection.beginTransaction()).then(migrationResult).then(Mono.from((Publisher)connection.commitTransaction())) : R2dbcMigrate.withAutoCommit(connection, migrationResult).then();
        return result;
    }

    private static <T> Mono<Void> transactionalWrapUnchecked(Connection connection, boolean transactional, Publisher<T> migrationThings) {
        Flux migrationResult = Flux.from(migrationThings);
        Mono result = transactional ? Mono.from((Publisher)connection.beginTransaction()).thenMany((Publisher)migrationResult).then(Mono.from((Publisher)connection.commitTransaction())) : R2dbcMigrate.withAutoCommit(connection, migrationResult).then();
        return result;
    }

    private static Mono<Void> waitForDatabase(ConnectionFactory connectionFactory, R2dbcMigrateProperties properties) {
        if (!properties.isWaitForDatabase()) {
            return Mono.empty();
        }
        Mono testResult = Mono.usingWhen((Publisher)Mono.defer(() -> {
            LOGGER.info("Creating new test connection");
            return Mono.from((Publisher)connectionFactory.create());
        }), connection -> Flux.from((Publisher)connection.validate(ValidationDepth.REMOTE)).filter(Boolean.TRUE::equals).switchIfEmpty((Publisher)Mono.error((Throwable)new RuntimeException("Connection is not valid"))).then(Flux.from((Publisher)connection.createStatement(properties.getValidationQuery()).execute()).flatMap(o -> o.map(R2dbcMigrate.getResultSafely("validation_result", String.class, "__VALIDATION_RESULT_NOT_PROVIDED"))).filter(s -> {
            LOGGER.info("Comparing expected value '{}' with provided result '{}'", new Object[]{properties.getValidationQueryExpectedResultValue(), s});
            return properties.getValidationQueryExpectedResultValue().equals(s);
        }).switchIfEmpty((Publisher)Mono.error((Throwable)new RuntimeException("Not matched result of test query"))).last()), connection -> {
            LOGGER.info("Closing test connection");
            return connection.close();
        }).log("R2dbcMigrateCreatingTestConnection", Level.FINE, new SignalType[0]);
        return testResult.timeout(properties.getValidationQueryTimeout()).retryWhen((Retry)Retry.fixedDelay((long)properties.getConnectionMaxRetries(), (Duration)properties.getValidationRetryDelay()).doBeforeRetry(retrySignal -> LOGGER.warn("Retrying to get database connection due {}: {}", new Object[]{retrySignal.failure().getClass(), retrySignal.failure().getMessage()}))).doOnSuccess(o -> LOGGER.info("Successfully got result '{}' of test query", new Object[]{o})).then();
    }

    public static Mono<Void> migrate(ConnectionFactory connectionFactory, R2dbcMigrateProperties properties, MigrateResourceReader resourceReader, SqlQueries maybeUserSqlQueries, Locker maybeLocker) {
        LOGGER.info("Configured with {}", new Object[]{properties});
        if (!properties.isEnable()) {
            return Mono.empty();
        }
        List<Tuple2<MigrateResource, MigrationMetadata>> allFileResources = R2dbcMigrate.getFileResources(properties, resourceReader);
        LOGGER.info("Found {} sql scripts, see details below", new Object[]{allFileResources.size()});
        List<Tuple2<MigrateResource, MigrationMetadata>> premigrationResources = allFileResources.stream().filter(objects -> ((MigrationMetadata)objects.getT2()).isPremigration()).collect(Collectors.toList());
        LOGGER.info("Found {} premigration sql scripts", new Object[]{premigrationResources.size()});
        List migrationResources = allFileResources.stream().filter(objects -> !((MigrationMetadata)objects.getT2()).isPremigration()).collect(Collectors.toList());
        LOGGER.info("Found {} migration sql scripts", new Object[]{migrationResources.size()});
        return R2dbcMigrate.waitForDatabase(connectionFactory, properties).then(R2dbcMigrate.premigrate(connectionFactory, properties, premigrationResources)).then(Mono.usingWhen((Publisher)connectionFactory.create(), connection -> R2dbcMigrate.doWork(connection, properties, migrationResources, maybeUserSqlQueries, maybeLocker), Connection::close).onErrorResume(throwable -> R2dbcMigrate.releaseLockAfterError(throwable, connectionFactory, properties, maybeLocker).then(Mono.error((Throwable)throwable))));
    }

    private static Mono<Void> ensureInternals(Connection connection, SqlQueries sqlQueries, Locker locker) {
        Batch createInternals = connection.createBatch();
        sqlQueries.createInternalTables().forEach(arg_0 -> ((Batch)createInternals).add(arg_0));
        locker.createInternalTables().forEach(arg_0 -> ((Batch)createInternals).add(arg_0));
        Publisher createInternalTables = createInternals.execute();
        return R2dbcMigrate.transactionalWrap(connection, true, (Publisher<? extends Result>)createInternalTables, "Making internal tables");
    }

    private static Mono<Void> acquireOrWaitForLock(Connection connection, Locker locker, R2dbcMigrateProperties properties) {
        Mono lockStatement = Mono.from((Publisher)locker.tryAcquireLock(connection).execute());
        Mono<? extends Object> lockResult = locker.extractResultOrError((Mono<? extends Result>)lockStatement);
        Mono waitForLock = lockResult.retryWhen((Retry)Retry.fixedDelay((long)properties.getAcquireLockMaxRetries(), (Duration)properties.getAcquireLockRetryDelay()).doAfterRetry(retrySignal -> LOGGER.warn("Waiting for lock")));
        return R2dbcMigrate.transactionalWrapUnchecked(connection, true, waitForLock);
    }

    private static List<Tuple2<MigrateResource, MigrationMetadata>> getFileResources(R2dbcMigrateProperties properties, MigrateResourceReader resourceReader) {
        ArrayList<Tuple2<MigrateResource, MigrationMetadata>> allResources = new ArrayList<Tuple2<MigrateResource, MigrationMetadata>>();
        block4: for (BunchOfResourcesEntry resourceEntry : properties.getResources()) {
            switch (resourceEntry.getType()) {
                case CONVENTIONALLY_NAMED_FILES: {
                    allResources.addAll(R2dbcMigrate.processConventionallyNamedFiles(resourceEntry, resourceReader));
                    continue block4;
                }
                case JUST_FILE: {
                    allResources.addAll(R2dbcMigrate.processJustFile(resourceEntry, resourceReader));
                    continue block4;
                }
            }
            throw new IllegalArgumentException("Wrong resourceEntry's type " + resourceEntry.getType());
        }
        List<Tuple2<MigrateResource, MigrationMetadata>> sortedResources = allResources.stream().sorted((o1, o2) -> {
            MigrationMetadata migrationMetadata1 = (MigrationMetadata)o1.getT2();
            MigrationMetadata migrationMetadata2 = (MigrationMetadata)o2.getT2();
            return Long.compare(migrationMetadata1.getVersion(), migrationMetadata2.getVersion());
        }).peek(objects -> LOGGER.debug("From {} got metadata {}", new Object[]{objects.getT1(), objects.getT2()})).collect(Collectors.toList());
        return sortedResources;
    }

    private static List<Tuple2<MigrateResource, MigrationMetadata>> processConventionallyNamedFiles(BunchOfResourcesEntry resourceEntry, MigrateResourceReader resourceReader) {
        if (Objects.nonNull(resourceEntry.getVersion())) {
            LOGGER.warn("For {} set version will be ignored because you cannot set it for type CONVENTIONALLY_NAMED_FILES", new Object[]{resourceEntry});
        }
        if (Objects.nonNull(resourceEntry.getDescription())) {
            LOGGER.warn("For {} set description will be ignored because you cannot set it for type CONVENTIONALLY_NAMED_FILES", new Object[]{resourceEntry});
        }
        if (Objects.nonNull(resourceEntry.getSplitByLine())) {
            LOGGER.warn("For {} set splitByLine will be ignored because you cannot set it for type CONVENTIONALLY_NAMED_FILES", new Object[]{resourceEntry});
        }
        if (Objects.nonNull(resourceEntry.getTransactional())) {
            LOGGER.warn("For {} set transactional will be ignored because you cannot set it for type CONVENTIONALLY_NAMED_FILES", new Object[]{resourceEntry});
        }
        if (Objects.nonNull(resourceEntry.getPremigration())) {
            LOGGER.warn("For {} set premigration will be ignored because you cannot set it for type CONVENTIONALLY_NAMED_FILES", new Object[]{resourceEntry});
        }
        if (Objects.nonNull(resourceEntry.getSubstitute())) {
            LOGGER.warn("For {} set substitute will be ignored because you cannot set it for type CONVENTIONALLY_NAMED_FILES", new Object[]{resourceEntry});
        }
        List<String> resourcesPaths = resourceEntry.getResourcesPaths();
        ArrayList<Tuple2<MigrateResource, MigrationMetadata>> readResources = new ArrayList<Tuple2<MigrateResource, MigrationMetadata>>();
        for (String resourcesPath : resourcesPaths) {
            List<Tuple2> readResourcesPortion = resourceReader.getResources(resourcesPath).stream().filter(Objects::nonNull).filter(MigrateResource::isReadable).map(resource -> {
                LOGGER.debug("Reading {}", new Object[]{resource});
                MigrationMetadata migrationMetadata = MigrationMetadataFactory.getMigrationMetadata(resource.getFilename());
                return Tuples.of((Object)resource, (Object)migrationMetadata);
            }).toList();
            readResources.addAll(readResourcesPortion);
        }
        return readResources;
    }

    private static List<Tuple2<MigrateResource, MigrationMetadata>> processJustFile(BunchOfResourcesEntry resourceEntry, MigrateResourceReader resourceReader) {
        if (Objects.isNull(resourceEntry.getVersion())) {
            throw new IllegalArgumentException("Missed version for " + resourceEntry);
        }
        if (Objects.isNull(resourceEntry.getDescription())) {
            throw new IllegalArgumentException("Missed description for " + resourceEntry);
        }
        if (resourceEntry.getResourcesPaths().size() != 1) {
            throw new IllegalArgumentException("For " + resourceEntry + " of type JUST_FILE you cannot provide != 1 resourcesPaths. Consider using several entries with JUST_FILE instead.");
        }
        String resourcePath = resourceEntry.getResourcePath();
        List gotResources = resourceReader.getResources(resourcePath);
        if (gotResources.size() != 1) {
            throw new IllegalStateException("ResourceEntry " + resourceEntry + " should provide exactly one file. We got " + gotResources.size());
        }
        MigrateResource gotResource = (MigrateResource)gotResources.get(0);
        if (!gotResource.isReadable()) {
            throw new IllegalStateException("Resource " + gotResource + " should be readable");
        }
        MigrationMetadata migrationMetadata = MigrationMetadataFactory.getMigrationMetadata(resourceEntry.getVersion(), resourceEntry.getDescription(), resourceEntry.getSplitByLine(), resourceEntry.getTransactional(), resourceEntry.getPremigration(), resourceEntry.getSubstitute());
        return Collections.singletonList(Tuples.of((Object)gotResource, (Object)migrationMetadata));
    }

    private static Mono<Void> releaseLock(Connection connection, Locker locker) {
        return R2dbcMigrate.transactionalWrap(connection, true, (Publisher<? extends Result>)locker.releaseLock(connection).execute(), "Releasing lock");
    }

    private static Mono<Void> releaseLockAfterError(Throwable throwable, ConnectionFactory connectionFactory, R2dbcMigrateProperties properties, Locker maybeUserLocker) {
        LOGGER.error("Got error during migration, will release lock", throwable);
        return Mono.usingWhen((Publisher)connectionFactory.create(), connection -> {
            Dialect sqlDialect = R2dbcMigrate.getSqlDialect(properties, connection);
            Locker locker = R2dbcMigrate.getUserOrDeterminedLocker(sqlDialect, properties, maybeUserLocker);
            return R2dbcMigrate.transactionalWrap(connection, false, (Publisher<? extends Result>)locker.releaseLock((Connection)connection).execute(), "Releasing lock after error");
        }, Connection::close);
    }

    private static Mono<Void> premigrate(ConnectionFactory connectionFactory, R2dbcMigrateProperties properties, List<Tuple2<MigrateResource, MigrationMetadata>> premigrationResources) {
        if (premigrationResources.isEmpty()) {
            return Mono.empty();
        }
        return Mono.usingWhen((Publisher)Mono.defer(() -> {
            LOGGER.info("Creating new premigration connection");
            return Mono.from((Publisher)connectionFactory.create());
        }), connection -> Flux.fromIterable((Iterable)premigrationResources).concatMap(tuple2 -> R2dbcMigrate.makeMigration(connection, properties, (Tuple2<MigrateResource, MigrationMetadata>)tuple2).log("R2dbcMigrateMakePreMigrationWork", Level.FINE, new SignalType[0]), 1).then(), connection -> {
            LOGGER.info("Closing premigration connection");
            return connection.close();
        }).log("R2dbcMigrateCreatingPreMigrationConnection", Level.FINE, new SignalType[0]);
    }

    private static Mono<Void> doWork(Connection connection, R2dbcMigrateProperties properties, List<Tuple2<MigrateResource, MigrationMetadata>> migrationResources, SqlQueries maybeUserSqlQueries, Locker maybeLocker) {
        Dialect sqlDialect = R2dbcMigrate.getSqlDialect(properties, connection);
        SqlQueries sqlQueries = R2dbcMigrate.getUserOrDeterminedSqlQueries(sqlDialect, properties, maybeUserSqlQueries);
        Locker locker = R2dbcMigrate.getUserOrDeterminedLocker(sqlDialect, properties, maybeLocker);
        return R2dbcMigrate.ensureInternals(connection, sqlQueries, locker).log("R2dbcMigrateEnsuringInternals", Level.FINE, new SignalType[0]).then(R2dbcMigrate.acquireOrWaitForLock(connection, locker, properties).log("R2dbcMigrateAcquiringLock", Level.FINE, new SignalType[0])).then(R2dbcMigrate.getDatabaseVersionOrZero(sqlQueries, connection, properties).log("R2dbcMigrateGetDatabaseVersion", Level.FINE, new SignalType[0])).flatMap(currentVersion -> {
            LOGGER.info("Database version is {}", new Object[]{currentVersion});
            return Flux.fromIterable((Iterable)migrationResources).log("R2dbcMigrateRequestingMigrationFiles", Level.FINE, new SignalType[0]).filter(objects -> ((MigrationMetadata)objects.getT2()).getVersion() > currentVersion).concatMap(tuple2 -> R2dbcMigrate.makeMigration(connection, properties, (Tuple2<MigrateResource, MigrationMetadata>)tuple2).log("R2dbcMigrateMakeMigrationWork", Level.FINE, new SignalType[0]).then(R2dbcMigrate.writeMigrationMetadata(connection, sqlQueries, (Tuple2<MigrateResource, MigrationMetadata>)tuple2).log("R2dbcMigrateWritingMigrationMetadata", Level.FINE, new SignalType[0])), 1).then(R2dbcMigrate.releaseLock(connection, locker).log("R2dbcMigrateReleasingLock", Level.FINE, new SignalType[0]));
        });
    }

    private static SqlQueries getUserOrDeterminedSqlQueries(Dialect sqlDialect, R2dbcMigrateProperties properties, SqlQueries maybeUserSqlQueries) {
        SqlQueries sqlQueries = Objects.requireNonNullElseGet(maybeUserSqlQueries, () -> switch (sqlDialect) {
            default -> throw new IncompatibleClassChangeError();
            case Dialect.POSTGRESQL -> new PostgreSqlQueries(properties.getMigrationsSchema(), properties.getMigrationsTable());
            case Dialect.MSSQL -> new MSSqlQueries(properties.getMigrationsSchema(), properties.getMigrationsTable());
            case Dialect.MYSQL -> new MySqlQueries(properties.getMigrationsSchema(), properties.getMigrationsTable());
            case Dialect.H2 -> new H2Queries(properties.getMigrationsSchema(), properties.getMigrationsTable());
            case Dialect.MARIADB -> new MariadbQueries(properties.getMigrationsSchema(), properties.getMigrationsTable());
        });
        LOGGER.debug("Instantiated {}", new Object[]{sqlQueries.getClass()});
        return sqlQueries;
    }

    private static Locker getUserOrDeterminedLocker(Dialect sqlDialect, R2dbcMigrateProperties properties, Locker maybeUserLocker) {
        Locker locker = Objects.requireNonNullElseGet(maybeUserLocker, () -> switch (sqlDialect) {
            default -> throw new IncompatibleClassChangeError();
            case Dialect.POSTGRESQL -> {
                if (properties.isPreferDbSpecificLock()) {
                    yield new PostgreSqlAdvisoryLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
                }
                yield new PostgreSqlTableLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
            }
            case Dialect.MSSQL -> new MSSqlTableLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
            case Dialect.MYSQL -> {
                if (properties.isPreferDbSpecificLock()) {
                    yield new MySqlSessionLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
                }
                yield new MySqlTableLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
            }
            case Dialect.H2 -> new H2TableLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
            case Dialect.MARIADB -> properties.isPreferDbSpecificLock() ? new MariadbSessionLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable()) : new MariadbTableLocker(properties.getMigrationsSchema(), properties.getMigrationsLockTable());
        });
        LOGGER.debug("Instantiated {}", new Object[]{locker.getClass()});
        return locker;
    }

    private static Mono<Void> makeMigration(Connection connection, R2dbcMigrateProperties properties, Tuple2<MigrateResource, MigrationMetadata> tt) {
        LOGGER.info("Applying {}", new Object[]{tt.getT2()});
        return R2dbcMigrate.transactionalWrap(connection, ((MigrationMetadata)tt.getT2()).isTransactional(), R2dbcMigrate.getMigrateResultPublisher(properties, connection, (MigrateResource)tt.getT1(), (MigrationMetadata)tt.getT2()), ((MigrationMetadata)tt.getT2()).toString());
    }

    private static Mono<Void> writeMigrationMetadata(Connection connection, SqlQueries sqlQueries, Tuple2<MigrateResource, MigrationMetadata> tt) {
        return R2dbcMigrate.transactionalWrap(connection, true, (Publisher<? extends Result>)sqlQueries.createInsertMigrationStatement(connection, (MigrationMetadata)tt.getT2()).execute(), "Writing metadata version " + ((MigrationMetadata)tt.getT2()).getVersion());
    }

    private static Mono<Long> getDatabaseVersionOrZero(SqlQueries sqlQueries, Connection connection, R2dbcMigrateProperties properties) {
        return R2dbcMigrate.withAutoCommit(connection, connection.createStatement(sqlQueries.getMaxMigration()).execute()).flatMap(o -> Mono.from((Publisher)o.map(R2dbcMigrate.getResultSafely("max", Long.class, 0L)))).switchIfEmpty((Publisher)Mono.just((Object)0L)).last();
    }

    static <ColumnType> BiFunction<Row, RowMetadata, ColumnType> getResultSafely(String resultColumn, Class<ColumnType> ct, ColumnType defaultValue) {
        return (row, rowMetadata) -> {
            if (rowMetadata.contains(resultColumn)) {
                Object value = row.get(resultColumn, ct);
                return value != null ? value : defaultValue;
            }
            return defaultValue;
        };
    }

    static Batch makeBatch(Connection connection, List<String> strings) {
        Batch batch = connection.createBatch();
        strings.forEach(arg_0 -> ((Batch)batch).add(arg_0));
        return batch;
    }

    private static Publisher<? extends Result> getMigrateResultPublisher(R2dbcMigrateProperties properties, Connection connection, MigrateResource resource, MigrationMetadata migrationMetadata) {
        if (migrationMetadata.isSplitByLine()) {
            Flux sequentFlux = FileReader.readChunked(resource, properties.getFileCharset()).buffer(properties.getChunkSize()).concatMap(strings -> {
                LOGGER.debug("Creating batch - for {} processing {} strings", new Object[]{migrationMetadata, strings.size()});
                List<String> substituted = strings.stream().map(s -> R2dbcMigrate.substituteIfNeed(properties, migrationMetadata, s)).toList();
                return R2dbcMigrate.makeBatch(connection, substituted).execute();
            }, 1);
            return sequentFlux;
        }
        String s = FileReader.read(resource, properties.getFileCharset());
        String substituted = R2dbcMigrate.substituteIfNeed(properties, migrationMetadata, s);
        return connection.createStatement(substituted).execute();
    }

    private static String substituteIfNeed(R2dbcMigrateProperties properties, MigrationMetadata migrationMetadata, String input) {
        String t = input;
        if (migrationMetadata.isSubstitute()) {
            if (properties.isUseEnvironmentSubstitutor()) {
                StringSubstitutor environmentSubstitutor = new StringSubstitutor(StringLookupFactory.INSTANCE.environmentVariableStringLookup());
                t = environmentSubstitutor.replace(t);
            }
            if (properties.isUseSystemPropertiesSubstitutor()) {
                StringSubstitutor systemPropertiesSubstitutor = new StringSubstitutor(StringLookupFactory.INSTANCE.systemPropertyStringLookup());
                t = systemPropertiesSubstitutor.replace(t);
            }
        }
        return t;
    }
}

