package com.chutneytesting.action.amqp.consumer;

import com.chutneytesting.action.amqp.utils.AmqpUtils;
import com.chutneytesting.action.amqp.utils.JsonPathEvaluator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/chutneytesting/action/amqp/consumer/QueueingConsumer.class */
public class QueueingConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueingConsumer.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final long maxAwait;
    private final Channel channel;
    private final String queueName;
    private final String selector;
    private final boolean ackIfMatch;
    private final CountDownLatch messageCounter;
    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
    private final Result result = new Result();

    /* loaded from: input_file:com/chutneytesting/action/amqp/consumer/QueueingConsumer$Result.class */
    public static class Result {
        public final List<Map<String, Object>> messages = new ArrayList();
        public final List<Object> payloads = new ArrayList();
        public final List<Map<String, Object>> headers = new ArrayList();
        public String consumeDuration;

        private void handleMessage(Map<String, Object> map) {
            this.messages.add(map);
            this.headers.add((Map) map.get("headers"));
            this.payloads.add(map.get("payload"));
        }
    }

    public QueueingConsumer(Channel channel, String str, int i, String str2, long j, boolean z) {
        this.selector = str2;
        this.maxAwait = j;
        this.channel = channel;
        this.queueName = str;
        this.ackIfMatch = z;
        this.messageCounter = new CountDownLatch(i);
    }

    public Result consume() throws IOException, InterruptedException {
        this.stopwatch.start();
        String basicConsume = this.channel.basicConsume(this.queueName, this::deliveryCallback, this::cancelCallback);
        this.messageCounter.await(this.maxAwait, TimeUnit.MILLISECONDS);
        this.channel.basicCancel(basicConsume);
        this.stopwatch.stop();
        this.result.consumeDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms";
        return this.result;
    }

    private void deliveryCallback(String str, Delivery delivery) {
        handleDelivery(delivery);
    }

    private void handleDelivery(Delivery delivery) {
        if (this.messageCounter.getCount() <= 0) {
            return;
        }
        Map<String, Object> convertMapLongStringToString = AmqpUtils.convertMapLongStringToString(delivery.getProperties().getHeaders());
        HashMap hashMap = new HashMap();
        Object extractPayload = extractPayload(delivery);
        hashMap.put("headers", convertMapLongStringToString);
        hashMap.put("payload", extractPayload);
        if (StringUtils.isBlank(this.selector)) {
            addMessageToResultAndCountDown(hashMap);
            return;
        }
        try {
            if (JsonPathEvaluator.evaluate(OBJECT_MAPPER.writeValueAsString(hashMap), this.selector)) {
                addMessageToResultAndCountDown(hashMap);
                acknowledgeMessage(delivery);
            }
        } catch (IOException e) {
            LOGGER.warn("Received a message, however cannot read process it as json, Ignoring message selection.", e);
        }
    }

    private void acknowledgeMessage(Delivery delivery) throws IOException {
        if (this.ackIfMatch) {
            this.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

    private void addMessageToResultAndCountDown(Map<String, Object> map) {
        this.result.handleMessage(map);
        this.messageCounter.countDown();
    }

    private Object extractPayload(Delivery delivery) {
        Object str;
        try {
            str = OBJECT_MAPPER.readValue(new String(delivery.getBody()), Map.class);
        } catch (IOException e) {
            LOGGER.warn("Received a message, however cannot read it as Json fallback as String.", e);
            str = new String(delivery.getBody());
        }
        return str;
    }

    private void cancelCallback(String str) {
    }
}
