/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.filter.FilterContext;
import io.streamthoughts.kafka.connect.filepulse.filter.FilterContextBuilder;
import io.streamthoughts.kafka.connect.filepulse.filter.FilterError;
import io.streamthoughts.kafka.connect.filepulse.filter.FilterException;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilter;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultRecordFilterPipeline
implements RecordFilterPipeline<FileRecord<TypedStruct>> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultRecordFilterPipeline.class);
    private final FilterNode rootNode;
    private FileObjectContext fileObjectObject;

    public DefaultRecordFilterPipeline(List<RecordFilter> filters) {
        Objects.requireNonNull(filters, "filters can't be null");
        ListIterator<RecordFilter> filterIterator = filters.listIterator(filters.size());
        FilterNode next = null;
        while (filterIterator.hasPrevious()) {
            next = new FilterNode(filterIterator.previous(), next);
        }
        this.rootNode = next;
    }

    @Override
    public void init(FileObjectContext fileObjectObject) {
        this.fileObjectObject = fileObjectObject;
        FilterNode node = this.rootNode;
        while (node != null) {
            RecordFilterPipeline<FileRecord<TypedStruct>> pipelineOnFailure = node.filter.onFailure();
            if (pipelineOnFailure != null) {
                pipelineOnFailure.init(fileObjectObject);
            }
            node.filter.clear();
            node = node.onSuccess;
        }
    }

    @Override
    public RecordsIterable<FileRecord<TypedStruct>> apply(RecordsIterable<FileRecord<TypedStruct>> records, boolean hasNext) throws FilterException {
        this.checkState();
        if (this.rootNode == null) {
            return records;
        }
        LinkedList<FileRecord<TypedStruct>> results = new LinkedList<FileRecord<TypedStruct>>();
        Iterator<FileRecord<TypedStruct>> iterator = records.iterator();
        while (iterator.hasNext()) {
            FileRecord<TypedStruct> record = iterator.next();
            boolean doHasNext = hasNext || iterator.hasNext();
            FilterContext context = this.newContextFor(record.offset(), this.fileObjectObject.metadata());
            results.addAll(this.apply(context, record.value(), doHasNext));
        }
        if (!hasNext && records.isEmpty()) {
            FilterNode node = this.rootNode;
            while (node != null) {
                List<FileRecord<TypedStruct>> flushed = node.flush(this.newContextFor(FileObjectOffset::empty, this.fileObjectObject.metadata()));
                results.addAll(flushed);
                node = node.onSuccess;
            }
        }
        return new RecordsIterable<FileRecord<TypedStruct>>((List<FileRecord<TypedStruct>>)results);
    }

    private FilterContext newContextFor(FileRecordOffset offset, FileObjectMeta metadata) {
        return FilterContextBuilder.newBuilder().withMetadata(metadata).withOffset(offset).build();
    }

    private void checkState() {
        if (this.fileObjectObject == null) {
            throw new IllegalStateException("Cannot apply this pipeline, no context initialized");
        }
    }

    @Override
    public List<FileRecord<TypedStruct>> apply(FilterContext context, TypedStruct record, boolean hasNext) {
        if (this.rootNode == null) {
            return Collections.singletonList(new TypedFileRecord(context.offset(), record));
        }
        return this.rootNode.apply(context, record, hasNext);
    }

    private class FilterNode {
        private final RecordFilter filter;
        private final FilterNode onSuccess;

        private FilterNode(RecordFilter filter, FilterNode onSuccess) {
            this.filter = filter;
            this.onSuccess = onSuccess;
        }

        public List<FileRecord<TypedStruct>> apply(FilterContext context, TypedStruct record, boolean hasNext) {
            RecordsIterable<TypedStruct> data;
            LinkedList<FileRecord<TypedStruct>> filtered = new LinkedList<FileRecord<TypedStruct>>();
            if (!this.filter.accept(context, record)) {
                if (this.onSuccess != null) {
                    filtered.addAll(this.onSuccess.apply(context, record, hasNext));
                } else {
                    if (!hasNext) {
                        filtered.addAll(this.flush(context));
                    }
                    filtered.add(this.newRecordFor(context, record));
                }
                return filtered;
            }
            try {
                data = this.filter.apply(context, record, hasNext);
            }
            catch (Exception e) {
                RecordFilterPipeline<FileRecord<TypedStruct>> onFailure = this.filter.onFailure();
                if (onFailure == null && !this.filter.ignoreFailure()) {
                    LOG.error("Failed to execute filter '{}' on record at offset '{}' from object-file {}. Error: {}", new Object[]{this.filter.label(), context.offset(), context.metadata(), e.getLocalizedMessage()});
                    throw e;
                }
                LOG.debug("Failed to execute filter '{}' on record at offset '{}' from object-file {}. Error: {} Exception will be either ignored or handled by a dedicated filter chain.", new Object[]{this.filter.label(), context.offset(), context.metadata(), e.getLocalizedMessage()});
                List<FileRecord<TypedStruct>> flushed = this.flush(context);
                filtered.addAll(flushed);
                if (onFailure != null) {
                    FilterContext errorContext = FilterContextBuilder.newBuilder(context).withError(FilterError.of(e, this.filter.label())).build();
                    filtered.addAll(onFailure.apply(errorContext, record, hasNext));
                } else if (this.onSuccess != null) {
                    filtered.addAll(this.onSuccess.apply(context, record, hasNext));
                } else {
                    filtered.add(this.newRecordFor(context, record));
                }
                return filtered;
            }
            List records = data.stream().map(s -> this.newRecordFor(context, (TypedStruct)s)).collect(Collectors.toList());
            filtered.addAll(records);
            if (this.onSuccess == null) {
                return filtered;
            }
            return filtered.stream().flatMap(r -> this.onSuccess.apply(FilterContextBuilder.newBuilder(context).build(), (TypedStruct)r.value(), hasNext).stream()).collect(Collectors.toList());
        }

        private TypedFileRecord newRecordFor(FilterContext context, TypedStruct object) {
            return new TypedFileRecord(context.offset(), object).withTopic(context.topic()).withPartition(context.partition()).withTimestamp(context.timestamp()).withHeaders(context.headers()).withKey(TypedValue.string(context.key()));
        }

        List<FileRecord<TypedStruct>> flush(FilterContext context) {
            LinkedList<FileRecord<TypedStruct>> filtered = new LinkedList<FileRecord<TypedStruct>>();
            RecordsIterable<FileRecord<TypedStruct>> buffered = this.filter.flush();
            if (this.onSuccess != null) {
                Iterator<FileRecord<TypedStruct>> iterator = buffered.iterator();
                while (iterator.hasNext()) {
                    FileRecord<TypedStruct> record = iterator.next();
                    FilterContext renewedContext = DefaultRecordFilterPipeline.this.newContextFor(record.offset(), context.metadata());
                    filtered.addAll(this.onSuccess.apply(renewedContext, record.value(), iterator.hasNext()));
                }
            } else {
                filtered.addAll(buffered.collect());
            }
            return filtered;
        }
    }
}

