package org.easybatch.core.job;

import org.easybatch.core.listener.BatchListener;
import org.easybatch.core.listener.CompositeBatchListener;
import org.easybatch.core.listener.CompositeJobListener;
import org.easybatch.core.listener.CompositePipelineListener;
import org.easybatch.core.listener.CompositeRecordReaderListener;
import org.easybatch.core.listener.CompositeRecordWriterListener;
import org.easybatch.core.listener.JobListener;
import org.easybatch.core.listener.PipelineListener;
import org.easybatch.core.listener.RecordReaderListener;
import org.easybatch.core.listener.RecordWriterListener;
import org.easybatch.core.processor.CompositeRecordProcessor;
import org.easybatch.core.processor.RecordProcessor;
import org.easybatch.core.reader.RecordReader;
import org.easybatch.core.record.Batch;
import org.easybatch.core.record.Record;
import org.easybatch.core.util.Utils;
import org.easybatch.core.writer.RecordWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/easybatch/core/job/BatchJob.class */
class BatchJob implements Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchJob.class);
    private static final String DEFAULT_JOB_NAME = "job";
    private RecordReader recordReader;
    private RecordWriter recordWriter;
    private RecordProcessor recordProcessor;
    private RecordTracker recordTracker;
    private JobListener jobListener;
    private BatchListener batchListener;
    private RecordReaderListener recordReaderListener;
    private RecordWriterListener recordWriterListener;
    private PipelineListener pipelineListener;
    private JobParameters parameters;
    private JobMonitor monitor;
    private String name = "job";
    private JobMetrics metrics = new JobMetrics();
    private JobReport report = new JobReport();

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchJob(JobParameters jobParameters) {
        this.parameters = jobParameters;
        this.report.setParameters(jobParameters);
        this.report.setMetrics(this.metrics);
        this.report.setJobName(this.name);
        this.report.setSystemProperties(System.getProperties());
        this.monitor = new JobMonitor(this.report);
        this.recordReader = new NoOpRecordReader();
        this.recordProcessor = new CompositeRecordProcessor();
        this.recordWriter = new NoOpRecordWriter();
        this.recordReaderListener = new CompositeRecordReaderListener();
        this.pipelineListener = new CompositePipelineListener();
        this.recordWriterListener = new CompositeRecordWriterListener();
        this.batchListener = new CompositeBatchListener();
        this.jobListener = new CompositeJobListener();
        this.recordTracker = new RecordTracker();
    }

    @Override // org.easybatch.core.job.Job
    public String getName() {
        return this.name;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public JobReport call() {
        start();
        try {
            openReader();
            openWriter();
            setStatus(JobStatus.STARTED);
            while (moreRecords() && !isInterrupted()) {
                writeBatch(readAndProcessBatch());
            }
            setStatus(JobStatus.STOPPING);
            teardown();
            return this.report;
        } catch (Exception e) {
            fail(e);
            return this.report;
        } finally {
            closeReader();
            closeWriter();
        }
    }

    private void start() {
        setStatus(JobStatus.STARTING);
        this.jobListener.beforeJobStart(this.parameters);
        this.recordTracker = new RecordTracker();
        this.metrics.setStartTime(System.currentTimeMillis());
        LOGGER.info("Batch size: {}", Integer.valueOf(this.parameters.getBatchSize()));
        LOGGER.info("Error threshold: {}", Utils.formatErrorThreshold(this.parameters.getErrorThreshold()));
        LOGGER.info("Jmx monitoring: {}", Boolean.valueOf(this.parameters.isJmxMonitoring()));
        registerJobMonitor();
    }

    private void registerJobMonitor() {
        if (this.parameters.isJmxMonitoring()) {
            this.monitor.registerJmxMBeanFor(this);
        }
    }

    private void openReader() throws RecordReaderOpeningException {
        try {
            LOGGER.debug("Opening record reader");
            this.recordReader.open();
        } catch (Exception e) {
            throw new RecordReaderOpeningException("Unable to open record reader", e);
        }
    }

    private void openWriter() throws RecordWriterOpeningException {
        try {
            LOGGER.debug("Opening record writer");
            this.recordWriter.open();
        } catch (Exception e) {
            throw new RecordWriterOpeningException("Unable to open record writer", e);
        }
    }

    private void setStatus(JobStatus jobStatus) {
        if (isInterrupted()) {
            LOGGER.info("Job '{}' has been interrupted, aborting execution.", this.name);
        }
        LOGGER.info("Job '{}' {}", this.name, jobStatus.name().toLowerCase());
        this.report.setStatus(jobStatus);
    }

    private boolean moreRecords() {
        return this.recordTracker.moreRecords();
    }

    private Batch readAndProcessBatch() throws RecordReadingException, ErrorThresholdExceededException {
        Batch batch = new Batch();
        this.batchListener.beforeBatchReading();
        int i = 0;
        while (true) {
            if (i >= this.parameters.getBatchSize()) {
                break;
            }
            Record readRecord = readRecord();
            if (readRecord == null) {
                this.recordTracker.noMoreRecords();
                break;
            }
            this.metrics.incrementReadCount();
            processRecord(readRecord, batch);
            i++;
        }
        this.batchListener.afterBatchProcessing(batch);
        return batch;
    }

    private Record readRecord() throws RecordReadingException {
        try {
            LOGGER.debug("Reading next record");
            this.recordReaderListener.beforeRecordReading();
            Record readRecord = this.recordReader.readRecord();
            this.recordReaderListener.afterRecordReading(readRecord);
            return readRecord;
        } catch (Exception e) {
            this.recordReaderListener.onRecordReadingException(e);
            throw new RecordReadingException("Unable to read next record", e);
        }
    }

    private void processRecord(Record record, Batch batch) throws ErrorThresholdExceededException {
        Record record2 = null;
        try {
            LOGGER.debug("Processing {}", record);
            notifyJobUpdate();
            Record beforeRecordProcessing = this.pipelineListener.beforeRecordProcessing(record);
            if (beforeRecordProcessing == null) {
                LOGGER.debug("{} has been filtered", record);
                this.metrics.incrementFilterCount();
            } else {
                record2 = this.recordProcessor.processRecord(beforeRecordProcessing);
                if (record2 == null) {
                    LOGGER.debug("{} has been filtered", record);
                    this.metrics.incrementFilterCount();
                } else {
                    batch.addRecord(record2);
                }
            }
            this.pipelineListener.afterRecordProcessing(record, record2);
        } catch (Exception e) {
            LOGGER.error("Unable to process {}", record, e);
            this.pipelineListener.onRecordProcessingException(record, e);
            this.metrics.incrementErrorCount();
            this.report.setLastError(e);
            if (this.metrics.getErrorCount() > this.parameters.getErrorThreshold()) {
                throw new ErrorThresholdExceededException("Error threshold exceeded. Aborting execution", e);
            }
        }
    }

    private void writeBatch(Batch batch) throws BatchWritingException {
        LOGGER.debug("Writing {}", batch);
        try {
            if (!batch.isEmpty()) {
                this.recordWriterListener.beforeRecordWriting(batch);
                this.recordWriter.writeRecords(batch);
                this.recordWriterListener.afterRecordWriting(batch);
                this.batchListener.afterBatchWriting(batch);
                this.metrics.incrementWriteCount(batch.size());
            }
        } catch (Exception e) {
            this.recordWriterListener.onRecordWritingException(batch, e);
            this.batchListener.onBatchWritingException(batch, e);
            throw new BatchWritingException("Unable to write records", e);
        }
    }

    private boolean isInterrupted() {
        return Thread.currentThread().isInterrupted();
    }

    private void teardown() {
        teardown(isInterrupted() ? JobStatus.ABORTED : JobStatus.COMPLETED);
    }

    private void teardown(JobStatus jobStatus) {
        this.report.setStatus(jobStatus);
        this.metrics.setEndTime(System.currentTimeMillis());
        LOGGER.info("Job '{}' finished with status: {}", new Object[]{this.name, this.report.getStatus()});
        notifyJobUpdate();
        this.jobListener.afterJobEnd(this.report);
    }

    private void fail(Exception exc) {
        String message = exc.getMessage();
        Throwable cause = exc.getCause();
        LOGGER.error(message, cause);
        this.report.setLastError(cause);
        teardown(JobStatus.FAILED);
    }

    private void closeReader() {
        try {
            LOGGER.debug("Closing record reader");
            this.recordReader.close();
        } catch (Exception e) {
            LOGGER.warn("Unable to close record reader", e);
            this.report.setLastError(e);
        }
    }

    private void closeWriter() {
        try {
            LOGGER.debug("Closing record writer");
            this.recordWriter.close();
        } catch (Exception e) {
            LOGGER.warn("Unable to close record writer", e);
            this.report.setLastError(e);
        }
    }

    private void notifyJobUpdate() {
        if (this.parameters.isJmxMonitoring()) {
            this.monitor.notifyJobReportUpdate();
        }
    }

    public void setRecordReader(RecordReader recordReader) {
        this.recordReader = recordReader;
    }

    public void setRecordWriter(RecordWriter recordWriter) {
        this.recordWriter = recordWriter;
    }

    public void addRecordProcessor(RecordProcessor recordProcessor) {
        ((CompositeRecordProcessor) this.recordProcessor).addRecordProcessor(recordProcessor);
    }

    public void addBatchListener(BatchListener batchListener) {
        ((CompositeBatchListener) this.batchListener).addBatchListener(batchListener);
    }

    public void addJobListener(JobListener jobListener) {
        ((CompositeJobListener) this.jobListener).addJobListener(jobListener);
    }

    public void addRecordReaderListener(RecordReaderListener recordReaderListener) {
        ((CompositeRecordReaderListener) this.recordReaderListener).addRecordReaderListener(recordReaderListener);
    }

    public void addRecordWriterListener(RecordWriterListener recordWriterListener) {
        ((CompositeRecordWriterListener) this.recordWriterListener).addRecordWriterListener(recordWriterListener);
    }

    public void addPipelineListener(PipelineListener pipelineListener) {
        ((CompositePipelineListener) this.pipelineListener).addPipelineListener(pipelineListener);
    }

    public void setName(String str) {
        this.name = str;
        this.report.setJobName(str);
    }
}
