/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.contract.verifier.messaging.kafka;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.contract.verifier.converter.YamlContract;
import org.springframework.cloud.contract.verifier.messaging.MessageVerifier;
import org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessagesInitializer;
import org.springframework.cloud.contract.verifier.messaging.kafka.Receiver;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

class KafkaStubMessages
implements MessageVerifier<Message<?>> {
    private static final Log log = LogFactory.getLog(KafkaStubMessages.class);
    final KafkaTemplate kafkaTemplate;
    private final Receiver receiver;

    KafkaStubMessages(KafkaTemplate kafkaTemplate, EmbeddedKafkaBroker broker, KafkaProperties kafkaProperties, KafkaStubMessagesInitializer initializer) {
        this.kafkaTemplate = kafkaTemplate;
        Map<String, Consumer> topicToConsumer = initializer.initialize(broker, kafkaProperties);
        this.receiver = new Receiver(topicToConsumer);
    }

    @Override
    public void send(Message<?> message, String destination, YamlContract contract) {
        String defaultTopic = this.kafkaTemplate.getDefaultTopic();
        try {
            this.kafkaTemplate.setDefaultTopic(destination);
            if (log.isDebugEnabled()) {
                log.debug((Object)("Will send a message [" + message + "] to destination [" + destination + "]"));
            }
            this.kafkaTemplate.send(message).get(5L, TimeUnit.SECONDS);
            this.kafkaTemplate.flush();
        }
        catch (Exception ex) {
            throw new IllegalStateException(ex);
        }
        finally {
            this.kafkaTemplate.setDefaultTopic(defaultTopic);
        }
    }

    @Override
    public Message receive(String destination, long timeout, TimeUnit timeUnit, YamlContract contract) {
        return this.receiver.receive(destination, timeout, timeUnit, contract);
    }

    @Override
    public Message receive(String destination, YamlContract contract) {
        return this.receive(destination, 5L, TimeUnit.SECONDS, contract);
    }

    @Override
    public void send(Object payload, Map headers, String destination, YamlContract contract) {
        Message message = MessageBuilder.createMessage((Object)payload, (MessageHeaders)new MessageHeaders(headers));
        this.send(message, destination, contract);
    }
}

