package io.confluent.ksql.planner.plan;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.QualifiedColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.streams.JoinParamsFactory;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.planner.Projection;
import io.confluent.ksql.planner.RequiredColumns;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.ColumnNames;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode.class */
public class JoinNode extends PlanNode implements JoiningNode {
    private final JoinType joinType;
    private final JoinKey joinKey;
    private final boolean finalJoin;
    private final PlanNode left;
    private final PlanNode right;
    private final JoiningNode leftJoining;
    private final JoiningNode rightJoining;
    private final Optional<WithinExpression> withinExpression;
    private final LogicalSchema schema;
    private final String defaultKeyFormat;

    @Immutable
    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$JoinKey.class */
    public interface JoinKey {
        static JoinKey sourceColumn(ColumnName columnName, Collection<QualifiedColumnReferenceExp> collection) {
            return SourceJoinKey.of(columnName, collection);
        }

        static JoinKey syntheticColumn() {
            return SyntheticJoinKey.of();
        }

        boolean isSynthetic();

        List<? extends Expression> getAllViableKeys(LogicalSchema logicalSchema);

        List<? extends Expression> getOriginalViableKeys(LogicalSchema logicalSchema);

        ColumnName resolveKeyName(PlanNode planNode, PlanNode planNode2);

        JoinKey rewriteWith(BiFunction<Expression, ExpressionTreeRewriter.Context<Void>, Optional<Expression>> biFunction);
    }

    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$JoinType.class */
    public enum JoinType {
        INNER,
        LEFT,
        OUTER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$Joiner.class */
    public static abstract class Joiner<K> {
        final PlanBuildContext buildContext;
        final JoinNode joinNode;
        final QueryContext.Stacker contextStacker;

        Joiner(PlanBuildContext planBuildContext, JoinNode joinNode, QueryContext.Stacker stacker) {
            this.buildContext = (PlanBuildContext) Objects.requireNonNull(planBuildContext, "buildContext");
            this.joinNode = (JoinNode) Objects.requireNonNull(joinNode, "joinNode");
            this.contextStacker = (QueryContext.Stacker) Objects.requireNonNull(stacker, "contextStacker");
        }

        public abstract SchemaKStream<K> join();

        SchemaKStream<K> buildStream(PlanNode planNode) {
            return (SchemaKStream<K>) planNode.buildStream(this.buildContext);
        }

        SchemaKTable<K> buildTable(PlanNode planNode) {
            SchemaKStream<?> buildStream = planNode.buildStream(this.buildContext.withKsqlConfig(this.buildContext.getKsqlConfig().cloneWithPropertyOverwrite(Collections.singletonMap("auto.offset.reset", "earliest"))));
            if (buildStream instanceof SchemaKTable) {
                return (SchemaKTable) buildStream;
            }
            throw new RuntimeException("Expected to find a Table, found a stream instead.");
        }
    }

    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$JoinerFactory.class */
    private static class JoinerFactory {
        private final Map<Pair<DataSource.DataSourceType, DataSource.DataSourceType>, Supplier<Joiner<?>>> joinerMap;

        JoinerFactory(PlanBuildContext planBuildContext, JoinNode joinNode, QueryContext.Stacker stacker) {
            this.joinerMap = ImmutableMap.of(new Pair(DataSource.DataSourceType.KSTREAM, DataSource.DataSourceType.KSTREAM), () -> {
                return new StreamToStreamJoiner(planBuildContext, joinNode, stacker);
            }, new Pair(DataSource.DataSourceType.KSTREAM, DataSource.DataSourceType.KTABLE), () -> {
                return new StreamToTableJoiner(planBuildContext, joinNode, stacker);
            }, new Pair(DataSource.DataSourceType.KTABLE, DataSource.DataSourceType.KTABLE), () -> {
                return new TableToTableJoiner(planBuildContext, joinNode, stacker);
            });
        }

        Joiner<?> getJoiner(DataSource.DataSourceType dataSourceType, DataSource.DataSourceType dataSourceType2) {
            return this.joinerMap.getOrDefault(new Pair(dataSourceType, dataSourceType2), () -> {
                throw new KsqlException("Join between invalid operands requested: left type: " + dataSourceType + ", right type: " + dataSourceType2);
            }).get();
        }
    }

    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$SourceJoinKey.class */
    private static final class SourceJoinKey implements JoinKey {
        private final ColumnName keyColumn;
        private final ImmutableList<QualifiedColumnReferenceExp> originalViableKeyColumns;
        private final ImmutableList<? extends ColumnReferenceExp> viableKeyColumns;

        static JoinKey of(ColumnName columnName, Collection<QualifiedColumnReferenceExp> collection) {
            return new SourceJoinKey(columnName, collection, collection);
        }

        private SourceJoinKey(ColumnName columnName, Collection<QualifiedColumnReferenceExp> collection, Collection<? extends ColumnReferenceExp> collection2) {
            this.keyColumn = (ColumnName) Objects.requireNonNull(columnName, "keyColumn");
            this.originalViableKeyColumns = ImmutableList.copyOf((Collection) Objects.requireNonNull(collection, "originalViableKeyColumns"));
            this.viableKeyColumns = ImmutableList.copyOf((Collection) Objects.requireNonNull(collection2, "viableKeyColumns"));
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public boolean isSynthetic() {
            return false;
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public List<? extends Expression> getAllViableKeys(LogicalSchema logicalSchema) {
            return ImmutableList.builder().addAll(this.viableKeyColumns).addAll(this.originalViableKeyColumns).build();
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public List<? extends Expression> getOriginalViableKeys(LogicalSchema logicalSchema) {
            return this.originalViableKeyColumns;
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public ColumnName resolveKeyName(PlanNode planNode, PlanNode planNode2) {
            return this.keyColumn;
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public JoinKey rewriteWith(BiFunction<Expression, ExpressionTreeRewriter.Context<Void>, Optional<Expression>> biFunction) {
            return new SourceJoinKey(this.keyColumn, this.originalViableKeyColumns, (List) this.viableKeyColumns.stream().map(columnReferenceExp -> {
                return ExpressionTreeRewriter.rewriteWith(biFunction, columnReferenceExp);
            }).collect(Collectors.toList()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$StreamToStreamJoiner.class */
    public static final class StreamToStreamJoiner<K> extends Joiner<K> {
        private StreamToStreamJoiner(PlanBuildContext planBuildContext, JoinNode joinNode, QueryContext.Stacker stacker) {
            super(planBuildContext, joinNode, stacker);
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.Joiner
        public SchemaKStream<K> join() {
            if (!this.joinNode.withinExpression.isPresent()) {
                throw new KsqlException("Stream-Stream joins must have a WITHIN clause specified. None was provided. To learn about how to specify a WITHIN clause with a stream-stream join, please visit: https://docs.confluent.io/current/ksql/docs/syntax-reference.html#create-stream-as-select");
            }
            SchemaKStream<K> buildStream = buildStream(this.joinNode.getLeft());
            SchemaKStream<K> buildStream2 = buildStream(this.joinNode.getRight());
            switch (this.joinNode.joinType) {
                case LEFT:
                    return buildStream.leftJoin(buildStream2, this.joinNode.getKeyColumnName(), ((WithinExpression) this.joinNode.withinExpression.get()).joinWindow(), JoiningNode.getValueFormatForSource(this.joinNode.left).getFormatInfo(), JoiningNode.getValueFormatForSource(this.joinNode.right).getFormatInfo(), this.contextStacker);
                case OUTER:
                    return buildStream.outerJoin(buildStream2, this.joinNode.getKeyColumnName(), ((WithinExpression) this.joinNode.withinExpression.get()).joinWindow(), JoiningNode.getValueFormatForSource(this.joinNode.left).getFormatInfo(), JoiningNode.getValueFormatForSource(this.joinNode.right).getFormatInfo(), this.contextStacker);
                case INNER:
                    return buildStream.join(buildStream2, this.joinNode.getKeyColumnName(), ((WithinExpression) this.joinNode.withinExpression.get()).joinWindow(), JoiningNode.getValueFormatForSource(this.joinNode.left).getFormatInfo(), JoiningNode.getValueFormatForSource(this.joinNode.right).getFormatInfo(), this.contextStacker);
                default:
                    throw new KsqlException("Invalid join type encountered: " + this.joinNode.joinType);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$StreamToTableJoiner.class */
    public static final class StreamToTableJoiner<K> extends Joiner<K> {
        private StreamToTableJoiner(PlanBuildContext planBuildContext, JoinNode joinNode, QueryContext.Stacker stacker) {
            super(planBuildContext, joinNode, stacker);
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.Joiner
        public SchemaKStream<K> join() {
            if (this.joinNode.withinExpression.isPresent()) {
                throw new KsqlException("A window definition was provided for a Stream-Table join. These joins are not windowed. Please drop the window definition (ie. the WITHIN clause) and try to execute your join again.");
            }
            SchemaKTable<K> buildTable = buildTable(this.joinNode.getRight());
            SchemaKStream<K> buildStream = buildStream(this.joinNode.getLeft());
            switch (this.joinNode.joinType) {
                case LEFT:
                    return buildStream.leftJoin(buildTable, this.joinNode.getKeyColumnName(), JoiningNode.getValueFormatForSource(this.joinNode.left).getFormatInfo(), this.contextStacker);
                case OUTER:
                    throw new KsqlException("Full outer joins between streams and tables are not supported.");
                case INNER:
                    return buildStream.join(buildTable, this.joinNode.getKeyColumnName(), JoiningNode.getValueFormatForSource(this.joinNode.left).getFormatInfo(), this.contextStacker);
                default:
                    throw new KsqlException("Invalid join type encountered: " + this.joinNode.joinType);
            }
        }
    }

    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$SyntheticJoinKey.class */
    private static final class SyntheticJoinKey implements JoinKey {
        static JoinKey of() {
            return new SyntheticJoinKey();
        }

        private SyntheticJoinKey() {
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public boolean isSynthetic() {
            return true;
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public List<? extends Expression> getAllViableKeys(LogicalSchema logicalSchema) {
            return getOriginalViableKeys(logicalSchema);
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public List<? extends Expression> getOriginalViableKeys(LogicalSchema logicalSchema) {
            return ImmutableList.of(new UnqualifiedColumnReferenceExp(((Column) Iterables.getOnlyElement(logicalSchema.key())).name()));
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public ColumnName resolveKeyName(PlanNode planNode, PlanNode planNode2) {
            return ColumnNames.generateSyntheticJoinKey(Streams.concat(new Stream[]{planNode.getSourceNodes(), planNode2.getSourceNodes()}).map((v0) -> {
                return v0.getDataSource();
            }).map((v0) -> {
                return v0.getSchema();
            }));
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.JoinKey
        public JoinKey rewriteWith(BiFunction<Expression, ExpressionTreeRewriter.Context<Void>, Optional<Expression>> biFunction) {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$TableToTableJoiner.class */
    public static final class TableToTableJoiner<K> extends Joiner<K> {
        TableToTableJoiner(PlanBuildContext planBuildContext, JoinNode joinNode, QueryContext.Stacker stacker) {
            super(planBuildContext, joinNode, stacker);
        }

        @Override // io.confluent.ksql.planner.plan.JoinNode.Joiner
        public SchemaKTable<K> join() {
            if (this.joinNode.withinExpression.isPresent()) {
                throw new KsqlException("A window definition was provided for a Table-Table join. These joins are not windowed. Please drop the window definition (i.e. the WITHIN clause) and try to execute your Table-Table join again.");
            }
            SchemaKTable<K> buildTable = buildTable(this.joinNode.getLeft());
            SchemaKTable<K> buildTable2 = buildTable(this.joinNode.getRight());
            switch (this.joinNode.joinType) {
                case LEFT:
                    return buildTable.leftJoin(buildTable2, this.joinNode.getKeyColumnName(), this.contextStacker);
                case OUTER:
                    return buildTable.outerJoin(buildTable2, this.joinNode.getKeyColumnName(), this.contextStacker);
                case INNER:
                    return buildTable.join(buildTable2, this.joinNode.getKeyColumnName(), this.contextStacker);
                default:
                    throw new KsqlException("Invalid join type encountered: " + this.joinNode.joinType);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public JoinNode(PlanNodeId planNodeId, JoinType joinType, JoinKey joinKey, boolean z, PlanNode planNode, PlanNode planNode2, Optional<WithinExpression> optional, String str) {
        super(planNodeId, calculateSinkType(planNode, planNode2), Optional.empty());
        this.schema = buildJoinSchema(joinKey, planNode, planNode2);
        this.joinType = (JoinType) Objects.requireNonNull(joinType, "joinType");
        this.joinKey = (JoinKey) Objects.requireNonNull(joinKey, "joinKey");
        this.finalJoin = z;
        this.left = (PlanNode) Objects.requireNonNull(planNode, "left");
        this.leftJoining = (JoiningNode) planNode;
        this.right = (PlanNode) Objects.requireNonNull(planNode2, "right");
        this.rightJoining = (JoiningNode) planNode2;
        this.withinExpression = (Optional) Objects.requireNonNull(optional, "withinExpression");
        this.defaultKeyFormat = (String) Objects.requireNonNull(str, "defaultKeyFormat");
    }

    public void resolveKeyFormats() {
        setKeyFormat(getPreferredKeyFormat().orElseGet(this::getDefaultSourceKeyFormat));
    }

    @Override // io.confluent.ksql.planner.plan.JoiningNode
    public Optional<KeyFormat> getPreferredKeyFormat() {
        Optional<KeyFormat> preferredKeyFormat = this.leftJoining.getPreferredKeyFormat();
        return preferredKeyFormat.isPresent() ? preferredKeyFormat : this.rightJoining.getPreferredKeyFormat();
    }

    @Override // io.confluent.ksql.planner.plan.JoiningNode
    public void setKeyFormat(KeyFormat keyFormat) {
        this.leftJoining.setKeyFormat(keyFormat);
        this.rightJoining.setKeyFormat(keyFormat);
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public LogicalSchema getSchema() {
        return this.schema;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public List<PlanNode> getSources() {
        return Arrays.asList(this.left, this.right);
    }

    public PlanNode getLeft() {
        return this.left;
    }

    public PlanNode getRight() {
        return this.right;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public SchemaKStream<?> buildStream(PlanBuildContext planBuildContext) {
        ensureMatchingPartitionCounts(planBuildContext.getServiceContext().getTopicClient());
        return new JoinerFactory(planBuildContext, this, planBuildContext.buildNodeContext(getId().toString())).getJoiner(this.left.getNodeOutputType(), this.right.getNodeOutputType()).join();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.ksql.planner.plan.PlanNode
    public int getPartitions(KafkaTopicClient kafkaTopicClient) {
        return this.right.getPartitions(kafkaTopicClient);
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Stream<ColumnName> resolveSelectStar(Optional<SourceName> optional) {
        Stream<ColumnName> flatMap = Stream.of((Object[]) new PlanNode[]{this.left, this.right}).flatMap(planNode -> {
            return planNode instanceof JoinNode ? planNode.getSources().stream() : Stream.of(planNode);
        }).filter(planNode2 -> {
            return !optional.isPresent() || optional.equals(planNode2.getSourceName());
        }).flatMap(planNode3 -> {
            return planNode3.resolveSelectStar(optional);
        });
        return (!optional.isPresent() && this.joinKey.isSynthetic() && this.finalJoin) ? Streams.concat(new Stream[]{Stream.of(((Column) Iterables.getOnlyElement(getSchema().key())).name()), flatMap}) : flatMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.confluent.ksql.planner.plan.PlanNode
    public void validateKeyPresent(SourceName sourceName, Projection projection) {
        Stream<? extends Expression> stream = this.joinKey.getAllViableKeys(this.schema).stream();
        projection.getClass();
        if (stream.anyMatch(projection::containsExpression)) {
            return;
        }
        throwKeysNotIncludedError(sourceName, "join expression", this.joinKey.getOriginalViableKeys(this.schema), false, this.joinKey.isSynthetic());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Set<ColumnReferenceExp> validateColumns(RequiredColumns requiredColumns) {
        RequiredColumns build = !this.finalJoin || !this.joinKey.isSynthetic() ? requiredColumns : requiredColumns.asBuilder().remove(new UnqualifiedColumnReferenceExp(((Column) Iterables.getOnlyElement(this.schema.key())).name())).build();
        return Sets.intersection(this.left.validateColumns(build), this.right.validateColumns(build));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ColumnName getKeyColumnName() {
        if (getSchema().key().size() > 1) {
            throw new KsqlException("JOINs are not supported with multiple key columns: " + getSchema().key());
        }
        return ((Column) Iterables.getOnlyElement(getSchema().key())).name();
    }

    private void ensureMatchingPartitionCounts(KafkaTopicClient kafkaTopicClient) {
        int partitions = this.left.getPartitions(kafkaTopicClient);
        int partitions2 = this.right.getPartitions(kafkaTopicClient);
        if (partitions == partitions2) {
            return;
        }
        SourceName sourceName = getSourceName(this.left);
        SourceName sourceName2 = getSourceName(this.right);
        throw new KsqlException("Can't join " + sourceName + " with " + sourceName2 + " since the number of partitions don't match. " + sourceName + " partitions = " + partitions + "; " + sourceName2 + " partitions = " + partitions2 + ". Please repartition either one so that the number of partitions match.");
    }

    private KeyFormat getDefaultSourceKeyFormat() {
        return (KeyFormat) Stream.of((Object[]) new PlanNode[]{this.left, this.right}).flatMap((v0) -> {
            return v0.getSourceNodes();
        }).map((v0) -> {
            return v0.getDataSource();
        }).map((v0) -> {
            return v0.getKsqlTopic();
        }).map((v0) -> {
            return v0.getKeyFormat();
        }).filter(keyFormat -> {
            return !keyFormat.getFormatInfo().getFormat().equals("NONE");
        }).findFirst().orElse(KeyFormat.nonWindowed(FormatInfo.of(this.defaultKeyFormat), SerdeFeatures.of(new SerdeFeature[0])));
    }

    private static SourceName getSourceName(PlanNode planNode) {
        return planNode.getLeftmostSourceNode().getAlias();
    }

    private static DataSource.DataSourceType calculateSinkType(PlanNode planNode, PlanNode planNode2) {
        return (planNode.getNodeOutputType() == DataSource.DataSourceType.KTABLE && planNode2.getNodeOutputType() == DataSource.DataSourceType.KTABLE) ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM;
    }

    private static LogicalSchema buildJoinSchema(JoinKey joinKey, PlanNode planNode, PlanNode planNode2) {
        return JoinParamsFactory.createSchema(joinKey.resolveKeyName(planNode, planNode2), planNode.getSchema(), planNode2.getSchema());
    }
}
