/*
 * Decompiled with CFR 0.152.
 */
package fish.payara.cloud.connectors.kafka.inbound;

import fish.payara.cloud.connectors.kafka.inbound.EndpointKey;
import fish.payara.cloud.connectors.kafka.inbound.KafkaActivationSpec;
import fish.payara.cloud.connectors.kafka.inbound.KafkaAsynchWorker;
import fish.payara.cloud.connectors.kafka.inbound.KafkaSynchWorker;
import fish.payara.cloud.connectors.kafka.inbound.KafkaWorker;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.Connector;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;

@Connector(displayName={"Apache Kafka Resource Adapter"}, vendorName="Payara Services Limited", version="1.0")
public class KafkaResourceAdapter
implements ResourceAdapter,
Serializable,
WorkListener {
    private static final Logger LOGGER = Logger.getLogger(KafkaResourceAdapter.class.getName());
    private final Map<EndpointKey, KafkaWorker> registeredWorkers = new ConcurrentHashMap<EndpointKey, KafkaWorker>();
    private BootstrapContext context;
    private WorkManager workManager;
    private boolean running;

    public void start(BootstrapContext ctx) throws ResourceAdapterInternalException {
        LOGGER.info("Kafka Resource Adapter Started..");
        this.context = ctx;
        this.workManager = this.context.getWorkManager();
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Kafka Resource Adapter Stopped");
        for (KafkaWorker work : this.registeredWorkers.values()) {
            work.stop();
        }
        this.running = false;
    }

    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException {
        if (spec instanceof KafkaActivationSpec) {
            EndpointKey endpointKey = new EndpointKey(endpointFactory, (KafkaActivationSpec)spec);
            if (((KafkaActivationSpec)spec).getUseSynchMode().booleanValue()) {
                KafkaSynchWorker kafkaWork = new KafkaSynchWorker(endpointKey);
                this.registeredWorkers.put(endpointKey, kafkaWork);
                this.workManager.scheduleWork((Work)kafkaWork);
            } else {
                KafkaAsynchWorker kafkaWork = new KafkaAsynchWorker(endpointKey, this.workManager);
                this.registeredWorkers.put(endpointKey, kafkaWork);
                this.workManager.scheduleWork((Work)kafkaWork, ((KafkaActivationSpec)spec).getPollInterval().longValue(), null, (WorkListener)this);
            }
        } else {
            LOGGER.warning("Got endpoint activation for an ActivationSpec of unknown class " + spec.getClass().getName());
        }
    }

    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) {
        KafkaWorker work = this.registeredWorkers.remove(new EndpointKey(endpointFactory, (KafkaActivationSpec)spec));
        if (work != null) {
            work.stop();
        }
    }

    public XAResource[] getXAResources(ActivationSpec[] specs) throws ResourceException {
        return null;
    }

    public boolean equals(Object o) {
        return super.equals(o);
    }

    public int hashCode() {
        return super.hashCode();
    }

    public void workAccepted(WorkEvent we) {
    }

    public void workRejected(WorkEvent we) {
    }

    public void workStarted(WorkEvent we) {
    }

    public void workCompleted(WorkEvent we) {
        try {
            KafkaWorker worker = (KafkaWorker)we.getWork();
            if (this.running && !worker.isStopped()) {
                this.workManager.scheduleWork((Work)worker, 1000L, null, (WorkListener)this);
            }
        }
        catch (WorkException ex) {
            Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

