/*
 * Decompiled with CFR 0.152.
 */
package org.jeasy.batch.core.job;

import java.time.LocalDateTime;
import org.jeasy.batch.core.jmx.JobMonitor;
import org.jeasy.batch.core.job.ErrorThresholdExceededException;
import org.jeasy.batch.core.job.Job;
import org.jeasy.batch.core.job.JobMetrics;
import org.jeasy.batch.core.job.JobParameters;
import org.jeasy.batch.core.job.JobReport;
import org.jeasy.batch.core.job.JobStatus;
import org.jeasy.batch.core.job.RecordTracker;
import org.jeasy.batch.core.listener.BatchListener;
import org.jeasy.batch.core.listener.CompositeBatchListener;
import org.jeasy.batch.core.listener.CompositeJobListener;
import org.jeasy.batch.core.listener.CompositePipelineListener;
import org.jeasy.batch.core.listener.CompositeRecordReaderListener;
import org.jeasy.batch.core.listener.CompositeRecordWriterListener;
import org.jeasy.batch.core.listener.JobListener;
import org.jeasy.batch.core.listener.PipelineListener;
import org.jeasy.batch.core.listener.RecordReaderListener;
import org.jeasy.batch.core.listener.RecordWriterListener;
import org.jeasy.batch.core.processor.CompositeRecordProcessor;
import org.jeasy.batch.core.processor.RecordProcessor;
import org.jeasy.batch.core.reader.RecordReader;
import org.jeasy.batch.core.record.Batch;
import org.jeasy.batch.core.record.Record;
import org.jeasy.batch.core.util.Utils;
import org.jeasy.batch.core.writer.RecordWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BatchJob<I, O>
implements Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchJob.class);
    private static final String DEFAULT_JOB_NAME = "job";
    private String name;
    private RecordReader<I> recordReader;
    private RecordWriter<O> recordWriter;
    private RecordProcessor<I, O> recordProcessor;
    private RecordTracker recordTracker;
    private JobListener jobListener;
    private BatchListener<O> batchListener;
    private RecordReaderListener<I> recordReaderListener;
    private RecordWriterListener<O> recordWriterListener;
    private PipelineListener pipelineListener;
    private JobParameters parameters;
    private JobMetrics metrics;
    private JobReport report;
    private JobMonitor monitor;

    BatchJob(JobParameters parameters) {
        this.parameters = parameters;
        this.name = DEFAULT_JOB_NAME;
        this.metrics = new JobMetrics();
        this.report = new JobReport();
        this.report.setParameters(parameters);
        this.report.setMetrics(this.metrics);
        this.report.setJobName(this.name);
        this.report.setSystemProperties(System.getProperties());
        this.monitor = new JobMonitor(this.report);
        this.recordReader = new NoOpRecordReader<I>();
        this.recordProcessor = new CompositeRecordProcessor();
        this.recordWriter = new NoOpRecordWriter<O>();
        this.recordReaderListener = new CompositeRecordReaderListener<I>();
        this.pipelineListener = new CompositePipelineListener();
        this.recordWriterListener = new CompositeRecordWriterListener<O>();
        this.batchListener = new CompositeBatchListener<O>();
        this.jobListener = new CompositeJobListener();
        this.recordTracker = new RecordTracker();
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public JobReport call() {
        this.start();
        try {
            this.openReader();
            this.openWriter();
            this.setStatus(JobStatus.STARTED);
            while (this.moreRecords() && !this.isInterrupted()) {
                Batch<O> batch = this.readAndProcessBatch();
                this.writeBatch(batch);
            }
            this.setStatus(JobStatus.STOPPING);
        }
        catch (Exception exception) {
            this.fail(exception);
            JobReport jobReport = this.report;
            return jobReport;
        }
        finally {
            this.closeReader();
            this.closeWriter();
        }
        this.teardown();
        return this.report;
    }

    private void start() {
        this.setStatus(JobStatus.STARTING);
        this.jobListener.beforeJob(this.parameters);
        this.recordTracker = new RecordTracker();
        this.metrics.setStartTime(LocalDateTime.now());
        LOGGER.debug("Batch size: {}", (Object)this.parameters.getBatchSize());
        LOGGER.debug("Error threshold: {}", (Object)Utils.formatErrorThreshold(this.parameters.getErrorThreshold()));
        LOGGER.debug("Jmx monitoring: {}", (Object)this.parameters.isJmxMonitoring());
        LOGGER.debug("Batch scanning: {}", (Object)this.parameters.isBatchScanningEnabled());
        this.registerJobMonitor();
    }

    private void registerJobMonitor() {
        if (this.parameters.isJmxMonitoring()) {
            this.monitor.registerJmxMBeanFor(this);
        }
    }

    private void openReader() throws Exception {
        try {
            LOGGER.debug("Opening record reader");
            this.recordReader.open();
        }
        catch (Exception e) {
            LOGGER.error("Unable to open record reader", (Throwable)e);
            throw e;
        }
    }

    private void openWriter() throws Exception {
        try {
            LOGGER.debug("Opening record writer");
            this.recordWriter.open();
        }
        catch (Exception e) {
            LOGGER.error("Unable to open record writer", (Throwable)e);
            throw e;
        }
    }

    private void setStatus(JobStatus status) {
        if (this.isInterrupted()) {
            LOGGER.info("Job '{}' has been interrupted, aborting execution.", (Object)this.name);
        }
        LOGGER.info("Job '{}' {}", (Object)this.name, (Object)status.name().toLowerCase());
        this.report.setStatus(status);
    }

    private boolean moreRecords() {
        return this.recordTracker.moreRecords();
    }

    private Batch<O> readAndProcessBatch() throws Exception {
        Batch batch = new Batch();
        this.batchListener.beforeBatchReading();
        for (int i = 0; i < this.parameters.getBatchSize(); ++i) {
            Record<I> record = this.readRecord();
            if (record == null) {
                LOGGER.debug("No more records");
                this.recordTracker.noMoreRecords();
                break;
            }
            this.metrics.incrementReadCount();
            this.processRecord(record, batch);
        }
        this.batchListener.afterBatchProcessing(batch);
        return batch;
    }

    private Record<I> readRecord() throws Exception {
        try {
            LOGGER.debug("Reading next record");
            this.recordReaderListener.beforeRecordReading();
            Record<I> record = this.recordReader.readRecord();
            this.recordReaderListener.afterRecordReading(record);
            return record;
        }
        catch (Exception e) {
            this.recordReaderListener.onRecordReadingException(e);
            LOGGER.error("Unable to read next record", (Throwable)e);
            throw e;
        }
    }

    private void processRecord(Record<I> record, Batch<O> batch) throws ErrorThresholdExceededException {
        block6: {
            Record<O> processedRecord = null;
            try {
                LOGGER.debug("Processing record {}", record);
                this.notifyJobUpdate();
                Record<I> preProcessedRecord = this.pipelineListener.beforeRecordProcessing(record);
                if (preProcessedRecord == null) {
                    LOGGER.debug("Record {} has been filtered", record);
                    this.metrics.incrementFilterCount();
                } else {
                    processedRecord = this.recordProcessor.processRecord(preProcessedRecord);
                    if (processedRecord == null) {
                        LOGGER.debug("Record {} has been filtered", record);
                        this.metrics.incrementFilterCount();
                    } else {
                        batch.addRecord(processedRecord);
                    }
                }
                this.pipelineListener.afterRecordProcessing(record, processedRecord);
            }
            catch (Exception e) {
                LOGGER.error("Unable to process record {}", record, (Object)e);
                this.pipelineListener.onRecordProcessingException(record, e);
                this.metrics.incrementErrorCount();
                this.report.setLastError(e);
                if (this.metrics.getErrorCount() <= this.parameters.getErrorThreshold()) break block6;
                String errorMessage = "Error threshold exceeded. Aborting execution";
                LOGGER.error(errorMessage, (Throwable)e);
                throw new ErrorThresholdExceededException(errorMessage, e);
            }
        }
    }

    private void writeBatch(Batch<O> batch) throws Exception {
        try {
            if (!batch.isEmpty()) {
                LOGGER.debug("Writing records {}", batch);
                this.recordWriterListener.beforeRecordWriting(batch);
                this.recordWriter.writeRecords(batch);
                this.recordWriterListener.afterRecordWriting(batch);
                this.batchListener.afterBatchWriting(batch);
                this.metrics.incrementWriteCount(batch.size());
            }
        }
        catch (Exception e) {
            this.recordWriterListener.onRecordWritingException(batch, e);
            this.batchListener.onBatchWritingException(batch, e);
            this.report.setLastError(e);
            if (this.parameters.isBatchScanningEnabled()) {
                this.scan(batch);
            }
            LOGGER.error("Unable to write records", (Throwable)e);
            throw e;
        }
    }

    private void scan(Batch<O> batch) {
        LOGGER.debug("Scanning records {}", batch);
        for (Record<O> record : batch) {
            record.getHeader().setScanned(true);
            Batch scannedBatch = new Batch(record);
            try {
                this.recordWriterListener.beforeRecordWriting(scannedBatch);
                this.recordWriter.writeRecords(scannedBatch);
                this.recordWriterListener.afterRecordWriting(scannedBatch);
                this.metrics.incrementWriteCount(scannedBatch.size());
            }
            catch (Exception exception) {
                this.recordWriterListener.onRecordWritingException(scannedBatch, exception);
                this.metrics.incrementErrorCount();
                this.report.setLastError(exception);
            }
        }
        LOGGER.debug("End of records scanning");
    }

    private boolean isInterrupted() {
        return Thread.currentThread().isInterrupted();
    }

    private void teardown() {
        JobStatus jobStatus = this.isInterrupted() ? JobStatus.ABORTED : JobStatus.COMPLETED;
        this.teardown(jobStatus);
    }

    private void teardown(JobStatus status) {
        this.report.setStatus(status);
        this.metrics.setEndTime(LocalDateTime.now());
        LOGGER.info("Job '{}' finished with status {} in {}", new Object[]{this.name, this.report.getStatus(), Utils.formatDuration(this.report.getMetrics().getDuration())});
        this.notifyJobUpdate();
        this.jobListener.afterJob(this.report);
    }

    private void fail(Exception exception) {
        this.report.setLastError(exception);
        this.teardown(JobStatus.FAILED);
    }

    private void closeReader() {
        try {
            LOGGER.debug("Closing record reader");
            this.recordReader.close();
        }
        catch (Exception e) {
            LOGGER.error("Unable to close record reader", (Throwable)e);
            this.report.setLastError(e);
        }
    }

    private void closeWriter() {
        try {
            LOGGER.debug("Closing record writer");
            this.recordWriter.close();
        }
        catch (Exception e) {
            LOGGER.error("Unable to close record writer", (Throwable)e);
            this.report.setLastError(e);
        }
    }

    private void notifyJobUpdate() {
        if (this.parameters.isJmxMonitoring()) {
            this.monitor.notifyJobReportUpdate();
        }
    }

    public void setRecordReader(RecordReader<I> recordReader) {
        this.recordReader = recordReader;
    }

    public void setRecordWriter(RecordWriter<O> recordWriter) {
        this.recordWriter = recordWriter;
    }

    public void addRecordProcessor(RecordProcessor<?, ?> recordProcessor) {
        ((CompositeRecordProcessor)this.recordProcessor).addRecordProcessor(recordProcessor);
    }

    public void addBatchListener(BatchListener<O> batchListener) {
        ((CompositeBatchListener)this.batchListener).addBatchListener(batchListener);
    }

    public void addJobListener(JobListener jobListener) {
        ((CompositeJobListener)this.jobListener).addJobListener(jobListener);
    }

    public void addRecordReaderListener(RecordReaderListener<I> recordReaderListener) {
        ((CompositeRecordReaderListener)this.recordReaderListener).addRecordReaderListener(recordReaderListener);
    }

    public void addRecordWriterListener(RecordWriterListener<O> recordWriterListener) {
        ((CompositeRecordWriterListener)this.recordWriterListener).addRecordWriterListener(recordWriterListener);
    }

    public void addPipelineListener(PipelineListener pipelineListener) {
        ((CompositePipelineListener)this.pipelineListener).addPipelineListener(pipelineListener);
    }

    public void setName(String name) {
        this.name = name;
        this.report.setJobName(name);
    }

    private static class NoOpRecordWriter<P>
    implements RecordWriter<P> {
        private NoOpRecordWriter() {
        }

        @Override
        public void writeRecords(Batch<P> batch) {
        }
    }

    private static class NoOpRecordReader<P>
    implements RecordReader<P> {
        private NoOpRecordReader() {
        }

        @Override
        public Record<P> readRecord() {
            return null;
        }
    }
}

