package org.springframework.messaging.simp.stomp;

import java.nio.ByteBuffer;
import java.util.Iterator;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;

/* loaded from: input_file:WEB-INF/lib/spring-messaging-4.3.26.RELEASE.jar:org/springframework/messaging/simp/stomp/Reactor2StompCodec.class */
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
    private final Function<Message<byte[]>, Buffer> encodingFunction;
    private final StompDecoder stompDecoder;

    /* loaded from: input_file:WEB-INF/lib/spring-messaging-4.3.26.RELEASE.jar:org/springframework/messaging/simp/stomp/Reactor2StompCodec$DecodingFunction.class */
    private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
        private final StompDecoder decoder;
        private final Consumer<Message<byte[]>> messageConsumer;

        public DecodingFunction(StompDecoder stompDecoder, Consumer<Message<byte[]>> consumer) {
            this.decoder = stompDecoder;
            this.messageConsumer = consumer;
        }

        public Message<byte[]> apply(Buffer buffer) {
            Iterator<Message<byte[]>> it = this.decoder.decode(buffer.byteBuffer()).iterator();
            while (it.hasNext()) {
                this.messageConsumer.accept(it.next());
            }
            return null;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/spring-messaging-4.3.26.RELEASE.jar:org/springframework/messaging/simp/stomp/Reactor2StompCodec$EncodingFunction.class */
    private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
        private final StompEncoder encoder;

        public EncodingFunction(StompEncoder stompEncoder) {
            this.encoder = stompEncoder;
        }

        public Buffer apply(Message<byte[]> message) {
            return new Buffer(ByteBuffer.wrap(this.encoder.encode(message)));
        }
    }

    public Reactor2StompCodec() {
        this(new StompEncoder(), new StompDecoder());
    }

    public Reactor2StompCodec(StompEncoder stompEncoder, StompDecoder stompDecoder) {
        Assert.notNull(stompEncoder, "StompEncoder is required");
        Assert.notNull(stompDecoder, "StompDecoder is required");
        this.encodingFunction = new EncodingFunction(stompEncoder);
        this.stompDecoder = stompDecoder;
    }

    public Function<Buffer, Message<byte[]>> decoder(Consumer<Message<byte[]>> consumer) {
        return new DecodingFunction(this.stompDecoder, consumer);
    }

    public Function<Message<byte[]>, Buffer> encoder() {
        return this.encodingFunction;
    }

    public Buffer apply(Message<byte[]> message) {
        return (Buffer) this.encodingFunction.apply(message);
    }
}
