/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsDlqDispatch;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.util.StringUtils;

class KStreamBinder
extends AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements ExtendedPropertiesBinder<KStream<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
    private static final Log LOG = LogFactory.getLog(KStreamBinder.class);
    private final KafkaTopicProvisioner kafkaTopicProvisioner;
    private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
    private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
    private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
    private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
    private final KeyValueSerdeResolver keyValueSerdeResolver;

    KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue2, KeyValueSerdeResolver keyValueSerdeResolver) {
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.kafkaTopicProvisioner = kafkaTopicProvisioner;
        this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
        this.KafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue2;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
    }

    protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group, KStream<Object, Object> inputTarget, ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
        this.KafkaStreamsBindingInformationCatalogue.registerConsumerProperties(inputTarget, (KafkaStreamsConsumerProperties)((Object)properties.getExtension()));
        ExtendedConsumerProperties extendedConsumerProperties = new ExtendedConsumerProperties(properties.getExtension());
        if (this.binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
            ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).setEnableDlq(true);
        }
        if (!StringUtils.hasText((String)group)) {
            group = this.binderConfigurationProperties.getApplicationId();
        }
        this.kafkaTopicProvisioner.provisionConsumerDestination(name, group, extendedConsumerProperties);
        StreamsConfig streamsConfig = this.KafkaStreamsBindingInformationCatalogue.getStreamsConfig(inputTarget);
        if (((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isEnableDlq()) {
            String dlqName = StringUtils.isEmpty((Object)((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName()) ? "error." + name + "." + group : ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName();
            KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, this.binderConfigurationProperties, (KafkaConsumerProperties)extendedConsumerProperties.getExtension());
            SendToDlqAndContinue sendToDlqAndContinue = (SendToDlqAndContinue)this.getApplicationContext().getBean(SendToDlqAndContinue.class);
            sendToDlqAndContinue.addKStreamDlqDispatch(name, kafkaStreamsDlqDispatch);
            DeserializationExceptionHandler deserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
            if (deserializationExceptionHandler instanceof SendToDlqAndContinue) {
                ((SendToDlqAndContinue)deserializationExceptionHandler).addKStreamDlqDispatch(name, kafkaStreamsDlqDispatch);
            }
        }
        return new DefaultBinding(name, group, inputTarget, null);
    }

    protected Binding<KStream<Object, Object>> doBindProducer(String name, KStream<Object, Object> outboundBindTarget, ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
        ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties((Object)new KafkaProducerProperties());
        this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
        Serde<?> keySerde = this.keyValueSerdeResolver.getOuboundKeySerde((KafkaStreamsProducerProperties)((Object)properties.getExtension()));
        Serde<?> valueSerde = this.keyValueSerdeResolver.getOutboundValueSerde((ProducerProperties)properties, (KafkaStreamsProducerProperties)((Object)properties.getExtension()));
        this.to(properties.isUseNativeEncoding(), name, outboundBindTarget, keySerde, valueSerde);
        return new DefaultBinding(name, null, outboundBindTarget, null);
    }

    private void to(boolean isNativeEncoding, String name, KStream<Object, Object> outboundBindTarget, Serde<Object> keySerde, Serde<Object> valueSerde) {
        if (!isNativeEncoding) {
            LOG.info((Object)("Native encoding is disabled for " + name + ". Outbound message conversion done by Spring Cloud Stream."));
            this.kafkaStreamsMessageConversionDelegate.serializeOnOutbound(outboundBindTarget).to(name, Produced.with(keySerde, valueSerde));
        } else {
            LOG.info((Object)("Native encoding is enabled for " + name + ". Outbound serialization done at the broker."));
            outboundBindTarget.to(name, Produced.with(keySerde, valueSerde));
        }
    }

    public KafkaStreamsConsumerProperties getExtendedConsumerProperties(String channelName) {
        return this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaStreamsProducerProperties getExtendedProducerProperties(String channelName) {
        return this.kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    public void setKafkaStreamsExtendedBindingProperties(KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
    }
}

