/*
 * Decompiled with CFR 0.152.
 */
package fish.payara.cloud.connectors.kafka.inbound;

import fish.payara.cloud.connectors.kafka.api.OnRecord;
import fish.payara.cloud.connectors.kafka.api.OnRecords;
import fish.payara.cloud.connectors.kafka.inbound.EndpointKey;
import fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter;
import fish.payara.cloud.connectors.kafka.inbound.KafkaWorker;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaSynchWorker
implements KafkaWorker {
    private static final Logger LOGGER = Logger.getLogger(KafkaSynchWorker.class.getName());
    private final EndpointKey key;
    private KafkaConsumer consumer;
    private AtomicBoolean ok = new AtomicBoolean(true);
    private final List<Method> onRecordMethods = new ArrayList<Method>();
    private final List<Method> onRecordsMethods = new ArrayList<Method>();

    public KafkaSynchWorker(EndpointKey key) {
        this.key = key;
        Class mdbClass = key.getMef().getEndpointClass();
        for (Method m : mdbClass.getMethods()) {
            if (m.isAnnotationPresent(OnRecord.class)) {
                if (m.getParameterCount() == 1 && ConsumerRecord.class.isAssignableFrom(m.getParameterTypes()[0])) {
                    this.onRecordMethods.add(m);
                } else {
                    LOGGER.log(Level.WARNING, "@{0} annotated MDBs must have only one parameter of type {1}. {2}#{3} endpoint will be ignored.", new Object[]{OnRecord.class.getSimpleName(), ConsumerRecord.class.getSimpleName(), mdbClass.getName(), m.getName()});
                }
            }
            if (!m.isAnnotationPresent(OnRecords.class)) continue;
            if (m.getParameterCount() == 1 && ConsumerRecords.class.isAssignableFrom(m.getParameterTypes()[0])) {
                this.onRecordsMethods.add(m);
                continue;
            }
            LOGGER.log(Level.WARNING, "@{0} annotated MDBs must have only one parameter of type {1}. {2}#{3} endpoint will be ignored.", new Object[]{OnRecords.class.getSimpleName(), ConsumerRecords.class.getSimpleName(), mdbClass.getName(), m.getName()});
        }
    }

    public void run() {
        try {
            Thread.sleep(this.key.getSpec().getInitialPollDelay());
        }
        catch (InterruptedException e) {
            LOGGER.log(Level.WARNING, "Interrupt Exception in wait for start");
        }
        try {
            this.consumer = new KafkaConsumer(this.key.getSpec().getConsumerProperties());
            this.consumer.subscribe(Arrays.asList(this.key.getSpec().getTopics().split(",")));
            MessageEndpoint endpoint = this.key.getMef().createEndpoint(null);
            while (this.ok.get()) {
                ConsumerRecords records = this.consumer.poll(Duration.of(this.key.getSpec().getPollInterval(), ChronoUnit.MILLIS));
                if (records.isEmpty()) continue;
                for (Method m : this.onRecordsMethods) {
                    OnRecords recordsAnnt = m.getAnnotation(OnRecords.class);
                    try {
                        this.deliverRecords(endpoint, m, (ConsumerRecords<Object, Object>)records);
                    }
                    catch (UnavailableException ex) {
                        Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, null, ex);
                    }
                    if (recordsAnnt.matchOtherMethods()) continue;
                    return;
                }
                block10: for (ConsumerRecord record : records) {
                    for (Method m : this.onRecordMethods) {
                        OnRecord recordAnnt = m.getAnnotation(OnRecord.class);
                        String[] topics = recordAnnt.topics();
                        if (topics.length != 0 && Arrays.binarySearch(recordAnnt.topics(), record.topic()) < 0) continue;
                        try {
                            this.deliverRecord(endpoint, m, (ConsumerRecord<Object, Object>)record);
                        }
                        catch (UnavailableException ex) {
                            Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        if (recordAnnt.matchOtherMethods()) continue;
                        continue block10;
                    }
                }
                if (!this.key.getSpec().getCommitEachPoll().booleanValue()) continue;
                this.consumer.commitSync();
            }
            this.consumer.close();
            endpoint.release();
        }
        catch (UnavailableException ex) {
            Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void deliverRecords(MessageEndpoint endpoint, Method m, ConsumerRecords<Object, Object> records) throws UnavailableException {
        try {
            endpoint.beforeDelivery(m);
            m.invoke((Object)endpoint, records);
            endpoint.afterDelivery();
        }
        catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException | ResourceException ex) {
            Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void deliverRecord(MessageEndpoint endpoint, Method m, ConsumerRecord<Object, Object> record) throws UnavailableException {
        try {
            endpoint.beforeDelivery(m);
            m.invoke((Object)endpoint, record);
            endpoint.afterDelivery();
        }
        catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException | ResourceException ex) {
            Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void stop() {
        this.ok.set(false);
    }

    @Override
    public boolean isStopped() {
        return !this.ok.get();
    }

    public void release() {
        this.stop();
    }
}

