/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.observation.ObservationRegistry;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BinderWrapper;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper;
import org.springframework.cloud.stream.function.StreamOperations;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.cloud.stream.utils.CacheKeyCreatorUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.config.GlobalChannelInterceptorProcessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public final class StreamBridge
implements StreamOperations,
SmartInitializingSingleton,
DisposableBean,
ApplicationListener<ApplicationEvent> {
    private static final String STREAM_BRIDGE_FUNC_NAME = "streamBridge";
    private final Log logger = LogFactory.getLog(this.getClass());
    private final Map<String, MessageChannel> channelCache;
    private final FunctionCatalog functionCatalog;
    private final NewDestinationBindingCallback destinationBindingCallback;
    private final BindingServiceProperties bindingServiceProperties;
    private final ConfigurableApplicationContext applicationContext;
    private boolean initialized;
    private boolean async;
    private final BindingService bindingService;
    private final Map<Integer, SimpleFunctionRegistry.FunctionInvocationWrapper> streamBridgeFunctionCache;
    private final FunctionInvocationHelper<?> functionInvocationHelper;
    private ExecutorService executorService;
    private static final boolean isContextPropagationPresent = ClassUtils.isPresent((String)"io.micrometer.context.ContextSnapshotFactory", (ClassLoader)StreamBridge.class.getClassLoader());
    private static final ReentrantLock lock = new ReentrantLock();
    private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

    StreamBridge(FunctionCatalog functionCatalog, final BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback, ObjectProvider<ObservationRegistry> observationRegistries) {
        this.executorService = Executors.newCachedThreadPool();
        Assert.notNull((Object)functionCatalog, (String)"'functionCatalog' must not be null");
        Assert.notNull((Object)applicationContext, (String)"'applicationContext' must not be null");
        Assert.notNull((Object)bindingServiceProperties, (String)"'bindingServiceProperties' must not be null");
        this.bindingService = (BindingService)applicationContext.getBean(BindingService.class);
        this.functionCatalog = functionCatalog;
        this.applicationContext = applicationContext;
        this.bindingServiceProperties = bindingServiceProperties;
        this.destinationBindingCallback = destinationBindingCallback;
        this.channelCache = new LinkedHashMap<String, MessageChannel>(){

            @Override
            protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
                boolean remove;
                boolean bl = remove = this.size() > bindingServiceProperties.getDynamicDestinationCacheSize();
                if (remove) {
                    if (StreamBridge.this.logger.isDebugEnabled()) {
                        StreamBridge.this.logger.debug((Object)("Removing message channel from cache " + eldest.getKey()));
                    }
                    StreamBridge.this.bindingService.unbindProducers(eldest.getKey());
                }
                return remove;
            }
        };
        this.functionInvocationHelper = (FunctionInvocationHelper)applicationContext.getBean(FunctionInvocationHelper.class);
        this.streamBridgeFunctionCache = new ConcurrentHashMap<Integer, SimpleFunctionRegistry.FunctionInvocationWrapper>();
        observationRegistries.ifAvailable(registry -> {
            this.observationRegistry = registry;
        });
    }

    @Override
    public boolean send(String bindingName, Object data) {
        MimeType contentType = StreamBridge.determineContentType(bindingName, this.bindingServiceProperties);
        return this.send(bindingName, data, contentType);
    }

    @Override
    public boolean send(String bindingName, Object data, MimeType outputContentType) {
        return this.send(bindingName, null, data, outputContentType);
    }

    @Override
    public boolean send(String bindingName, @Nullable String binderName, Object data) {
        MimeType contentType = StreamBridge.determineContentType(bindingName, this.bindingServiceProperties);
        return this.send(bindingName, binderName, data, contentType);
    }

    private static MimeType determineContentType(String bindingName, BindingServiceProperties bindingServiceProperties) {
        BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(bindingName);
        return StringUtils.hasText((String)bindingProperties.getContentType()) ? MimeType.valueOf((String)bindingProperties.getContentType()) : MimeTypeUtils.APPLICATION_JSON;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType) {
        Message resultMessage;
        GenericMessage genericMessage;
        if (!this.initialized) {
            this.afterSingletonsInstantiated();
        }
        ProducerProperties producerProperties = this.bindingServiceProperties.getProducerProperties(bindingName);
        MessageChannel messageChannel = this.resolveDestination(bindingName, producerProperties, binderName);
        Object functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties);
        if (producerProperties != null && producerProperties.isPartitioned()) {
            functionToInvoke = new PartitionAwareFunctionWrapper((Function<?, ?>)functionToInvoke, this.applicationContext, producerProperties);
        }
        String targetType = this.resolveBinderTargetType(bindingName, binderName, MessageChannel.class, (BinderFactory)this.applicationContext.getBean(BinderFactory.class));
        if (data instanceof Message) {
            Message messageData = (Message)data;
            genericMessage = MessageBuilder.fromMessage((Message)messageData).setHeaderIfAbsent(MessageUtils.TARGET_PROTOCOL, (Object)targetType).build();
        } else {
            genericMessage = new GenericMessage(data, Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, targetType));
        }
        GenericMessage messageToSend = genericMessage;
        lock.lock();
        try {
            resultMessage = (Message)functionToInvoke.apply(messageToSend);
        }
        finally {
            lock.unlock();
        }
        if (resultMessage == null) {
            if (messageToSend.getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
                resultMessage = messageToSend;
            } else {
                throw new RuntimeException(functionToInvoke.getClass().getName() + " returned null");
            }
        }
        resultMessage = (Message)this.functionInvocationHelper.postProcessResult((Object)resultMessage, null);
        return messageChannel.send(resultMessage);
    }

    private int hashProducerProperties(ProducerProperties producerProperties, String outputContentType) {
        int hash = outputContentType.hashCode() + Boolean.hashCode(producerProperties.isUseNativeEncoding()) + Boolean.hashCode(producerProperties.isPartitioned()) + producerProperties.getPartitionCount();
        if (producerProperties.getPartitionKeyExpression() != null && producerProperties.getBindingName() != null) {
            hash += producerProperties.getBindingName().hashCode();
        }
        return hash;
    }

    private SimpleFunctionRegistry.FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) {
        int streamBridgeFunctionKey = this.hashProducerProperties(producerProperties, outputContentType);
        return this.streamBridgeFunctionCache.computeIfAbsent(streamBridgeFunctionKey, key -> {
            SimpleFunctionRegistry.FunctionInvocationWrapper functionToInvoke = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(STREAM_BRIDGE_FUNC_NAME, new String[]{outputContentType.toString()});
            functionToInvoke.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
            return functionToInvoke;
        });
    }

    public void afterSingletonsInstantiated() {
        if (this.initialized) {
            return;
        }
        FunctionRegistration fr = new FunctionRegistration((Object)new SimpleFunctionRegistry.PassThruFunction(), new String[]{STREAM_BRIDGE_FUNC_NAME});
        fr.getProperties().put("singleton", "false");
        Type functionType = ResolvableType.forClassWithGenerics(Function.class, (Class[])new Class[]{Object.class, Object.class}).getType();
        ((FunctionRegistry)this.functionCatalog).register(fr.type(functionType));
        this.initialized = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    MessageChannel resolveDestination(String destinationName, ProducerProperties producerProperties, String binderName) {
        lock.lock();
        try {
            MessageChannel messageChannel = this.channelCache.get(CacheKeyCreatorUtils.createChannelCacheKey(binderName, destinationName, this.bindingServiceProperties));
            if (messageChannel == null) {
                if (this.applicationContext.containsBean(destinationName)) {
                    messageChannel = (MessageChannel)this.applicationContext.getBean(destinationName, MessageChannel.class);
                    Object[] consumerBindingNames = this.bindingService.getConsumerBindingNames();
                    if (messageChannel instanceof AbstractMessageChannel) {
                        this.addPartitioningInterceptorIfNeedBe(producerProperties, destinationName, (AbstractMessageChannel)messageChannel);
                    }
                    if (ObjectUtils.containsElement((Object[])consumerBindingNames, (Object)destinationName)) {
                        this.logger.warn((Object)"You seem to be sending data to the input binding.  It is not recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
                    }
                } else {
                    messageChannel = this.isAsync() ? new ExecutorChannel((Executor)this.executorService) : new DirectWithAttributesChannel();
                    ((AbstractSubscribableChannel)messageChannel).setApplicationContext((ApplicationContext)this.applicationContext);
                    ((AbstractSubscribableChannel)messageChannel).setComponentName(destinationName);
                    BinderWrapper binderWrapper = this.bindingService.createBinderWrapper(binderName, destinationName, messageChannel.getClass());
                    ((AbstractSubscribableChannel)messageChannel).registerObservationRegistry(this.observationRegistry);
                    if (this.destinationBindingCallback != null) {
                        Object extendedProducerProperties = this.bindingService.getExtendedProducerProperties(binderWrapper.binder(), destinationName);
                        this.destinationBindingCallback.configure(destinationName, messageChannel, producerProperties, extendedProducerProperties);
                    }
                    this.addPartitioningInterceptorIfNeedBe(producerProperties, destinationName, (AbstractMessageChannel)messageChannel);
                    this.addGlobalChannelInterceptorProcessor((AbstractMessageChannel)messageChannel, destinationName);
                    this.bindingService.bindProducer(messageChannel, true, binderWrapper);
                    this.channelCache.put(binderWrapper.cacheKey(), messageChannel);
                }
            }
            MessageChannel messageChannel2 = messageChannel;
            return messageChannel2;
        }
        finally {
            lock.unlock();
        }
    }

    private void addPartitioningInterceptorIfNeedBe(ProducerProperties producerProperties, String destinationName, AbstractMessageChannel messageChannel) {
        if (producerProperties != null && producerProperties.isPartitioned() && producerProperties.isUseNativeEncoding()) {
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName);
            messageChannel.addInterceptor((ChannelInterceptor)new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory()));
        }
    }

    private String resolveBinderTargetType(String channelName, String binderName, Class<?> bindableType, BinderFactory binderFactory) {
        String binderConfigurationName = binderName != null ? binderName : this.bindingServiceProperties.getBinder(channelName);
        Binder<?, ConsumerProperties, ProducerProperties> binder = binderFactory.getBinder(binderConfigurationName, bindableType);
        String targetProtocol = binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
        return targetProtocol;
    }

    private void addGlobalChannelInterceptorProcessor(AbstractMessageChannel messageChannel, String destinationName) {
        GlobalChannelInterceptorProcessor globalChannelInterceptorProcessor = (GlobalChannelInterceptorProcessor)this.applicationContext.getBean(GlobalChannelInterceptorProcessor.class);
        globalChannelInterceptorProcessor.postProcessAfterInitialization((Object)messageChannel, destinationName);
    }

    public void destroy() throws Exception {
        this.executorService.shutdown();
        if (!this.executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
            this.logger.warn((Object)"Failed to terminate executor. Terminating current tasks.");
            this.executorService.shutdownNow();
        }
        this.executorService = null;
        this.async = false;
        this.channelCache.keySet().forEach(this.bindingService::unbindProducers);
        this.channelCache.clear();
    }

    public boolean isAsync() {
        return this.async;
    }

    public void setAsync(boolean async) {
        if (isContextPropagationPresent) {
            this.executorService = ContextPropagationHelper.wrap(this.executorService);
        }
        this.async = async;
    }

    public void onApplicationEvent(ApplicationEvent event) {
        if (event.getClass().getName().equals("org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent")) {
            this.channelCache.clear();
        }
    }

    private static final class ContextPropagationHelper {
        private ContextPropagationHelper() {
        }

        static ExecutorService wrap(ExecutorService executorService) {
            return ContextExecutorService.wrap((ExecutorService)executorService, () -> ContextSnapshotFactory.builder().build().captureAll(new Object[0]));
        }
    }
}

