/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.Util;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

public class StreamJmsP<T>
extends AbstractProcessor {
    public static final int PREFERRED_LOCAL_PARALLELISM = 4;
    private final Connection connection;
    private final DistributedFunction<? super Connection, ? extends Session> sessionFn;
    private final DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn;
    private final DistributedConsumer<? super Session> flushFn;
    private final DistributedFunction<? super Message, ? extends T> projectionFn;
    private Session session;
    private MessageConsumer consumer;
    private Traverser<? extends T> traverser;

    StreamJmsP(Connection connection, DistributedFunction<? super Connection, ? extends Session> sessionFn, DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn, DistributedConsumer<? super Session> flushFn, DistributedFunction<? super Message, ? extends T> projectionFn) {
        this.connection = connection;
        this.sessionFn = sessionFn;
        this.consumerFn = consumerFn;
        this.flushFn = flushFn;
        this.projectionFn = projectionFn;
    }

    @Nonnull
    public static <T> ProcessorSupplier supplier(@Nonnull DistributedSupplier<? extends Connection> connectionSupplier, @Nonnull DistributedFunction<? super Connection, ? extends Session> sessionFn, @Nonnull DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn, @Nonnull DistributedConsumer<? super Session> flushFn, @Nonnull DistributedFunction<? super Message, ? extends T> projectionFn) {
        return new Supplier(connectionSupplier, sessionFn, consumerFn, flushFn, projectionFn);
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.session = this.sessionFn.apply((Connection)this.connection);
        this.consumer = this.consumerFn.apply((Session)this.session);
        this.traverser = ((Traverser<Message>)() -> Util.uncheckCall(() -> this.consumer.receiveNoWait())).map(this.projectionFn).peek(item -> this.flushFn.accept((Session)this.session));
    }

    @Override
    public boolean complete() {
        this.emitFromTraverser(this.traverser);
        return false;
    }

    @Override
    public void close() throws Exception {
        this.consumer.close();
        this.session.close();
    }

    private static final class Supplier<T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final DistributedSupplier<? extends Connection> connectionSupplier;
        private final DistributedFunction<? super Connection, ? extends Session> sessionFn;
        private final DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn;
        private final DistributedConsumer<? super Session> flushFn;
        private final DistributedFunction<? super Message, ? extends T> projectionFn;
        private transient Connection connection;

        private Supplier(DistributedSupplier<? extends Connection> connectionSupplier, DistributedFunction<? super Connection, ? extends Session> sessionFn, DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn, DistributedConsumer<? super Session> flushFn, DistributedFunction<? super Message, ? extends T> projectionFn) {
            this.connectionSupplier = connectionSupplier;
            this.sessionFn = sessionFn;
            this.consumerFn = consumerFn;
            this.flushFn = flushFn;
            this.projectionFn = projectionFn;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.connection = this.connectionSupplier.get();
            Util.uncheckRun(() -> this.connection.start());
        }

        @Override
        public void close(@Nullable Throwable error) throws Exception {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        @Override
        @Nonnull
        public Collection<? extends Processor> get(int count) {
            return IntStream.range(0, count).mapToObj(i -> new StreamJmsP<T>(this.connection, this.sessionFn, this.consumerFn, this.flushFn, this.projectionFn)).collect(Collectors.toList());
        }
    }
}

