/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.message.Subscriptions;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState;
import io.camunda.zeebe.engine.state.immutable.MessageStartEventSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageStartEventSubscriptionRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import io.camunda.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;

public final class MessagePublishProcessor
implements TypedRecordProcessor<MessageRecord> {
    private static final String ALREADY_PUBLISHED_MESSAGE = "Expected to publish a new message with id '%s', but a message with that id was already published";
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final MessageStartEventSubscriptionState startEventSubscriptionState;
    private final SubscriptionCommandSender commandSender;
    private final KeyGenerator keyGenerator;
    private final StateWriter stateWriter;
    private final EventHandle eventHandle;
    private final Subscriptions correlatingSubscriptions = new Subscriptions();
    private MessageRecord messageRecord;
    private long messageKey;
    private final TypedResponseWriter responseWriter;
    private final TypedRejectionWriter rejectionWriter;

    public MessagePublishProcessor(MessageState messageState, MessageSubscriptionState subscriptionState, MessageStartEventSubscriptionState startEventSubscriptionState, EventScopeInstanceState eventScopeInstanceState, SubscriptionCommandSender commandSender, KeyGenerator keyGenerator, Writers writers, ProcessState processState, EventTriggerBehavior eventTriggerBehavior, BpmnStateBehavior stateBehavior) {
        this.messageState = messageState;
        this.subscriptionState = subscriptionState;
        this.startEventSubscriptionState = startEventSubscriptionState;
        this.commandSender = commandSender;
        this.keyGenerator = keyGenerator;
        this.stateWriter = writers.state();
        this.responseWriter = writers.response();
        this.rejectionWriter = writers.rejection();
        this.eventHandle = new EventHandle(keyGenerator, eventScopeInstanceState, writers, processState, eventTriggerBehavior, stateBehavior);
    }

    @Override
    public void processRecord(TypedRecord<MessageRecord> command) {
        this.messageRecord = (MessageRecord)command.getValue();
        this.correlatingSubscriptions.clear();
        if (this.messageRecord.hasMessageId() && this.messageState.exist(this.messageRecord.getNameBuffer(), this.messageRecord.getCorrelationKeyBuffer(), this.messageRecord.getMessageIdBuffer(), this.messageRecord.getTenantId())) {
            String rejectionReason = String.format(ALREADY_PUBLISHED_MESSAGE, BufferUtil.bufferAsString((DirectBuffer)this.messageRecord.getMessageIdBuffer()));
            this.rejectionWriter.appendRejection(command, RejectionType.ALREADY_EXISTS, rejectionReason);
            this.responseWriter.writeRejectionOnCommand(command, RejectionType.ALREADY_EXISTS, rejectionReason);
        } else {
            this.handleNewMessage(command);
        }
    }

    private void handleNewMessage(TypedRecord<MessageRecord> command) {
        this.messageKey = this.keyGenerator.nextKey();
        this.messageRecord.setDeadline(command.getTimestamp() + this.messageRecord.getTimeToLive());
        this.stateWriter.appendFollowUpEvent(this.messageKey, (Intent)MessageIntent.PUBLISHED, (RecordValue)command.getValue());
        this.responseWriter.writeEventOnCommand(this.messageKey, (Intent)MessageIntent.PUBLISHED, (UnpackedObject)command.getValue(), command);
        this.correlateToSubscriptions(this.messageKey, this.messageRecord);
        this.correlateToMessageStartEvents(this.messageRecord);
        this.sendCorrelateCommand();
        if (this.messageRecord.getTimeToLive() <= 0L) {
            this.stateWriter.appendFollowUpEvent(this.messageKey, (Intent)MessageIntent.EXPIRED, (RecordValue)this.messageRecord);
        }
    }

    private void correlateToSubscriptions(long messageKey, MessageRecord message) {
        this.subscriptionState.visitSubscriptions(message.getTenantId(), message.getNameBuffer(), message.getCorrelationKeyBuffer(), subscription -> {
            if (!subscription.isCorrelating() && !this.correlatingSubscriptions.contains(subscription.getRecord().getBpmnProcessIdBuffer())) {
                MessageSubscriptionRecord correlatingSubscription = subscription.getRecord().setMessageKey(messageKey).setVariables(message.getVariablesBuffer());
                this.stateWriter.appendFollowUpEvent(subscription.getKey(), (Intent)MessageSubscriptionIntent.CORRELATING, (RecordValue)correlatingSubscription);
                this.correlatingSubscriptions.add(correlatingSubscription);
            }
            return true;
        });
    }

    private void correlateToMessageStartEvents(MessageRecord messageRecord) {
        this.startEventSubscriptionState.visitSubscriptionsByMessageName(messageRecord.getTenantId(), messageRecord.getNameBuffer(), subscription -> {
            MessageStartEventSubscriptionRecord subscriptionRecord = subscription.getRecord();
            DirectBuffer bpmnProcessIdBuffer = subscriptionRecord.getBpmnProcessIdBuffer();
            DirectBuffer correlationKeyBuffer = messageRecord.getCorrelationKeyBuffer();
            if (!(this.correlatingSubscriptions.contains(bpmnProcessIdBuffer) || correlationKeyBuffer.capacity() != 0 && this.messageState.existActiveProcessInstance(messageRecord.getTenantId(), bpmnProcessIdBuffer, correlationKeyBuffer))) {
                this.correlatingSubscriptions.add(subscriptionRecord);
                this.eventHandle.triggerMessageStartEvent(subscription.getKey(), subscriptionRecord, this.messageKey, messageRecord);
            }
        });
    }

    private boolean sendCorrelateCommand() {
        return this.correlatingSubscriptions.visitSubscriptions(subscription -> this.commandSender.correlateProcessMessageSubscription(subscription.getProcessInstanceKey(), subscription.getElementInstanceKey(), subscription.getBpmnProcessId(), this.messageRecord.getNameBuffer(), this.messageKey, this.messageRecord.getVariablesBuffer(), this.messageRecord.getCorrelationKeyBuffer(), this.messageRecord.getTenantId()));
    }
}

