package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.ksql.serde.json.KsqlJsonDeserializer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.class */
public final class RecordFormatter {
    private static final DateTimeFormatter DATA_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS z").withZone(ZoneOffset.UTC);
    private static final long DEFAULT_WINDOW_SIZE = 1;
    private final Deserializers keyDeserializers;
    private final Deserializers valueDeserializers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/RecordFormatter$DeserializationException.class */
    public static final class DeserializationException extends RuntimeException {
        DeserializationException(String str) {
            super(str);
        }

        DeserializationException(String str, Throwable th) {
            super(str, th);
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/RecordFormatter$Deserializers.class */
    static final class Deserializers {
        private final String topicName;
        private final List<NamedDeserializer> deserializers;
        private boolean seenData = false;

        Deserializers(String str, SchemaRegistryClient schemaRegistryClient, boolean z) {
            this.topicName = (String) Objects.requireNonNull(str, "topicName");
            List<NamedDeserializer> list = (List) Arrays.stream(Format.values()).map(format -> {
                return format.getDeserializer(schemaRegistryClient);
            }).collect(Collectors.toList());
            if (z) {
                this.deserializers = (List) list.stream().flatMap(namedDeserializer -> {
                    return namedDeserializer.doNotWrap ? Stream.of(namedDeserializer) : Streams.concat(new Stream[]{Arrays.stream(WindowSchema.values()).map(windowSchema -> {
                        return windowSchema.wrap(namedDeserializer);
                    }), Stream.of(namedDeserializer)});
                }).collect(Collectors.toList());
            } else {
                this.deserializers = list;
            }
        }

        List<String> getPossibleFormats() {
            return !this.seenData ? ImmutableList.of("¯\\_(ツ)_/¯ - no data processed") : (List) this.deserializers.stream().map((v0) -> {
                return v0.toString();
            }).filter(str -> {
                return !str.equals(Format.UNRECOGNISED_BYTES.toString());
            }).collect(Collectors.toList());
        }

        String format(Bytes bytes) {
            if (bytes == null || bytes.get() == null) {
                return "<null>";
            }
            this.seenData = true;
            String str = null;
            Iterator<NamedDeserializer> it = this.deserializers.iterator();
            while (it.hasNext()) {
                Optional<String> tryDeserializer = tryDeserializer(bytes, it.next());
                if (tryDeserializer.isPresent() && str == null) {
                    str = tryDeserializer.get();
                }
                if (!tryDeserializer.isPresent()) {
                    it.remove();
                }
            }
            return str == null ? "<Failed to deserialize>" : str;
        }

        private Optional<String> tryDeserializer(Bytes bytes, NamedDeserializer namedDeserializer) {
            try {
                Object deserialize = namedDeserializer.deserializer.deserialize(this.topicName, bytes.get());
                return Optional.of(deserialize == null ? "<null>" : deserialize.toString());
            } catch (Exception e) {
                return Optional.empty();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/RecordFormatter$Format.class */
    public enum Format {
        AVRO(0, KafkaAvroDeserializer::new),
        PROTOBUF(0, RecordFormatter::newProtobufDeserializer),
        JSON(RecordFormatter::newJsonDeserializer),
        JSON_SR(0, KafkaJsonSchemaDeserializer::new),
        KAFKA_INT(IntegerDeserializer::new),
        KAFKA_BIGINT(LongDeserializer::new),
        KAFKA_DOUBLE(DoubleDeserializer::new),
        KAFKA_STRING(RecordFormatter::newStringDeserializer),
        UNRECOGNISED_BYTES(BytesDeserializer::new);

        private final Function<SchemaRegistryClient, Deserializer<?>> deserializerFactory;

        Format(Supplier supplier) {
            this(1, schemaRegistryClient -> {
                return (Deserializer) supplier.get();
            });
        }

        Format(int i, Function function) {
            this.deserializerFactory = (Function) Objects.requireNonNull(function, "deserializerFactory");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public NamedDeserializer getDeserializer(SchemaRegistryClient schemaRegistryClient) {
            return new NamedDeserializer(name(), this == UNRECOGNISED_BYTES, this.deserializerFactory.apply(schemaRegistryClient));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/RecordFormatter$NamedDeserializer.class */
    public static final class NamedDeserializer {
        final String name;
        final boolean doNotWrap;
        final Deserializer<?> deserializer;

        private NamedDeserializer(String str, boolean z, Deserializer<?> deserializer) {
            this.name = (String) Objects.requireNonNull(str, "name");
            this.doNotWrap = z;
            this.deserializer = (Deserializer) Objects.requireNonNull(deserializer, "deserializer");
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/RecordFormatter$WindowSchema.class */
    public enum WindowSchema {
        SESSION(WindowSchema::newSessionWindowedDeserializer),
        HOPPING(WindowSchema::newTimeWindowedDeserializer),
        TUMBLING(WindowSchema::newTimeWindowedDeserializer);

        private final Function<NamedDeserializer, Deserializer<?>> mapper;

        WindowSchema(Function function) {
            this.mapper = (Function) Objects.requireNonNull(function, "mapper");
        }

        public NamedDeserializer wrap(NamedDeserializer namedDeserializer) {
            return new NamedDeserializer(name() + "(" + namedDeserializer.name + ")", namedDeserializer.doNotWrap, this.mapper.apply(namedDeserializer));
        }

        private static Deserializer<?> newSessionWindowedDeserializer(NamedDeserializer namedDeserializer) {
            SessionWindowedDeserializer sessionWindowedDeserializer = new SessionWindowedDeserializer(namedDeserializer.deserializer);
            return (str, bArr) -> {
                Windowed deserialize = sessionWindowedDeserializer.deserialize(str, bArr);
                String valueOf = String.valueOf(deserialize.key());
                long start = deserialize.window().start();
                deserialize.window().end();
                return "[" + valueOf + "@" + start + "/" + valueOf + "]";
            };
        }

        private static Deserializer<?> newTimeWindowedDeserializer(NamedDeserializer namedDeserializer) {
            TimeWindowedDeserializer timeWindowedDeserializer = new TimeWindowedDeserializer(namedDeserializer.deserializer, Long.valueOf(RecordFormatter.DEFAULT_WINDOW_SIZE));
            return (str, bArr) -> {
                Windowed deserialize = timeWindowedDeserializer.deserialize(str, bArr);
                return "[" + String.valueOf(deserialize.key()) + "@" + deserialize.window().start() + "/-]";
            };
        }
    }

    public RecordFormatter(SchemaRegistryClient schemaRegistryClient, String str) {
        this(new Deserializers(str, schemaRegistryClient, true), new Deserializers(str, schemaRegistryClient, false));
    }

    @VisibleForTesting
    RecordFormatter(Deserializers deserializers, Deserializers deserializers2) {
        this.keyDeserializers = (Deserializers) Objects.requireNonNull(deserializers, "keyDeserializers");
        this.valueDeserializers = (Deserializers) Objects.requireNonNull(deserializers2, "valueDeserializers");
    }

    public List<String> format(Iterable<ConsumerRecord<Bytes, Bytes>> iterable) {
        String str = this.keyDeserializers.getPossibleFormats().get(0);
        String str2 = this.valueDeserializers.getPossibleFormats().get(0);
        List<String> formatRecords = formatRecords(iterable);
        Stream<String> stream = this.keyDeserializers.getPossibleFormats().stream();
        Objects.requireNonNull(str);
        boolean anyMatch = stream.anyMatch((v1) -> {
            return r1.equals(v1);
        });
        Stream<String> stream2 = this.valueDeserializers.getPossibleFormats().stream();
        Objects.requireNonNull(str2);
        return (anyMatch && stream2.anyMatch((v1) -> {
            return r1.equals(v1);
        })) ? formatRecords : formatRecords(iterable);
    }

    public List<String> getPossibleKeyFormats() {
        return this.keyDeserializers.getPossibleFormats();
    }

    public List<String> getPossibleValueFormats() {
        return this.valueDeserializers.getPossibleFormats();
    }

    private List<String> formatRecords(Iterable<ConsumerRecord<Bytes, Bytes>> iterable) {
        return (List) StreamSupport.stream(iterable.spliterator(), false).map(this::formatRecord).collect(Collectors.toList());
    }

    private String formatRecord(ConsumerRecord<Bytes, Bytes> consumerRecord) {
        return "rowtime: " + formatRowTime(consumerRecord.timestamp()) + ", key: " + this.keyDeserializers.format((Bytes) consumerRecord.key()) + ", value: " + this.valueDeserializers.format((Bytes) consumerRecord.value()) + ", partition: " + consumerRecord.partition();
    }

    private static String formatRowTime(long j) {
        return j == -1 ? "N/A" : DATA_FORMATTER.format(Instant.ofEpochMilli(j));
    }

    private static Deserializer<?> newProtobufDeserializer(SchemaRegistryClient schemaRegistryClient) {
        TextFormat.Printer printer = TextFormat.printer();
        KafkaProtobufDeserializer kafkaProtobufDeserializer = new KafkaProtobufDeserializer(schemaRegistryClient);
        return (str, bArr) -> {
            Message deserialize = kafkaProtobufDeserializer.deserialize(str, bArr);
            if (deserialize == null) {
                return null;
            }
            return printer.shortDebugString(deserialize);
        };
    }

    private static Deserializer<?> newJsonDeserializer() {
        String replacement = StandardCharsets.UTF_8.newDecoder().replacement();
        return (str, bArr) -> {
            if (bArr.length == 0) {
                throw new DeserializationException("Empty data");
            }
            String str = new String(bArr, StandardCharsets.UTF_8);
            if (str.contains(replacement)) {
                throw new DeserializationException("String contains replacement char");
            }
            try {
                KsqlJsonDeserializer.jsonReader().readTree(str);
                return str;
            } catch (IOException e) {
                throw new DeserializationException("Failed to deserialize as JSON", e);
            }
        };
    }

    private static Deserializer<?> newStringDeserializer() {
        StringDeserializer stringDeserializer = new StringDeserializer();
        String replacement = StandardCharsets.UTF_8.newDecoder().replacement();
        return (str, bArr) -> {
            if (bArr.length == 0) {
                throw new DeserializationException("Empty data");
            }
            String deserialize = stringDeserializer.deserialize("", bArr);
            if (deserialize.contains(replacement)) {
                throw new DeserializationException("String contains replacement char");
            }
            return deserialize;
        };
    }
}
