package io.confluent.kafka.replication.push.buffer;

import io.confluent.kafka.replication.push.PushSession;
import io.confluent.kafka.replication.push.PushSessionEndReason;
import io.confluent.kafka.replication.push.ReplicationConfig;
import io.confluent.kafka.replication.push.buffer.PushReplicationEvent;
import io.confluent.kafka.replication.push.metrics.PushReplicationManagerMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AppendRecordsRequestData;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/replication/push/buffer/BufferingAppendRecordsBuilder.class */
public final class BufferingAppendRecordsBuilder {
    private static final Logger log = LoggerFactory.getLogger(BufferingAppendRecordsBuilder.class);
    private static final long NO_RECORDS_ADDED_TIME_MS = -1;
    private final int destinationBrokerId;
    private final long destinationBrokerEpoch;
    private final long maxRequestSizeBytes;
    private final long maxRequestPartitionSizeBytes;
    private final RefCountingMemoryTracker<MemoryRecords> tracker;
    private final Time time;
    private final PushReplicationManagerMetrics pushReplicationManagerMetrics;
    private final long creationTimeMs;
    private final long lingerMs;
    private final long maxWaitMs;
    private final Map<Uuid, Map<Integer, BufferingPartitionDataBuilder>> topicData = new HashMap();
    private long totalSizeInBytes = 0;
    private long recordsAddedTimeMs = -1;

    /* renamed from: io.confluent.kafka.replication.push.buffer.BufferingAppendRecordsBuilder$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/kafka/replication/push/buffer/BufferingAppendRecordsBuilder$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type = new int[PushReplicationEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.HWM_UPDATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.LSO_UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.MEMORY_RECORDS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.STOP_PUSH.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public BufferingAppendRecordsBuilder(int i, long j, ReplicationConfig replicationConfig, RefCountingMemoryTracker<MemoryRecords> refCountingMemoryTracker, Time time, PushReplicationManagerMetrics pushReplicationManagerMetrics) {
        this.destinationBrokerId = i;
        this.destinationBrokerEpoch = j;
        this.maxRequestSizeBytes = replicationConfig.maxRequestSizeBytes();
        this.maxRequestPartitionSizeBytes = replicationConfig.maxRequestPartitionSizeBytes();
        this.tracker = refCountingMemoryTracker;
        this.time = time;
        this.pushReplicationManagerMetrics = pushReplicationManagerMetrics;
        this.creationTimeMs = time.hiResClockMs();
        this.lingerMs = replicationConfig.lingerMs();
        this.maxWaitMs = replicationConfig.maxWaitMs();
    }

    public boolean processEvent(PushReplicationEvent<?> pushReplicationEvent, PushSession pushSession) {
        TopicIdPartition topicIdPartition = pushReplicationEvent.topicIdPartition();
        BufferingPartitionDataBuilder computeIfAbsent = this.topicData.computeIfAbsent(topicIdPartition.topicId(), uuid -> {
            return new HashMap();
        }).computeIfAbsent(Integer.valueOf(topicIdPartition.partitionId()), num -> {
            return new BufferingPartitionDataBuilder(topicIdPartition, pushSession, this.maxRequestPartitionSizeBytes);
        });
        if (pushSession.replicaEpoch() != this.destinationBrokerEpoch) {
            log.warn("Received event {} for push session {} with incompatible replica epoch (different than {})", new Object[]{pushReplicationEvent, pushSession, Long.valueOf(this.destinationBrokerEpoch)});
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[pushReplicationEvent.type().ordinal()]) {
            case 1:
                return computeIfAbsent.addHighWatermarkUpdate(pushSession, ((PushReplicationEvent.OffsetPayload) pushReplicationEvent.payload()).offset());
            case 2:
                return computeIfAbsent.addLogStartOffsetUpdate(pushSession, ((PushReplicationEvent.OffsetPayload) pushReplicationEvent.payload()).offset());
            case 3:
                PushReplicationEvent.RecordsPayload recordsPayload = (PushReplicationEvent.RecordsPayload) pushReplicationEvent.payload();
                MemoryRecords memoryRecords = (MemoryRecords) recordsPayload.records();
                long sizeInBytes = this.totalSizeInBytes + memoryRecords.sizeInBytes();
                boolean z = !hasBufferedRecords();
                if (sizeInBytes > this.maxRequestSizeBytes && !z) {
                    return false;
                }
                boolean addMemoryRecords = computeIfAbsent.addMemoryRecords(pushSession, memoryRecords, recordsPayload.appendOffset());
                if (addMemoryRecords) {
                    if (this.recordsAddedTimeMs == -1) {
                        this.recordsAddedTimeMs = this.time.hiResClockMs();
                    }
                    this.totalSizeInBytes = sizeInBytes;
                }
                return addMemoryRecords;
            case ReplicationConfig.MAX_PUSHERS_DEFAULT /* 4 */:
                boolean z2 = ((PushSessionEndReason) pushReplicationEvent.payload()).sendEndSessionRequest;
                handleEvictedRecords(computeIfAbsent.stopPushAndDiscardState(z2));
                if (z2) {
                    return true;
                }
                removePartitionEntry(topicIdPartition.topicId(), topicIdPartition.partitionId());
                return true;
            default:
                log.warn("{} events not expected to be handled here", pushReplicationEvent.type());
                return false;
        }
    }

    public AppendRecordsRequestData build() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Uuid, Map<Integer, BufferingPartitionDataBuilder>> entry : this.topicData.entrySet()) {
            AppendRecordsRequestData.TopicData topicData = new AppendRecordsRequestData.TopicData();
            topicData.setTopicId(entry.getKey());
            ArrayList arrayList2 = new ArrayList();
            Iterator<BufferingPartitionDataBuilder> it = entry.getValue().values().iterator();
            while (it.hasNext()) {
                AppendRecordsRequestData.PartitionData build = it.next().build();
                long sizeInBytes = build.records().sizeInBytes();
                if (sizeInBytes > 0) {
                    this.pushReplicationManagerMetrics.recordPartitionReplicationBytesOut(topicData.topicId(), build.partitionIndex(), sizeInBytes);
                }
                arrayList2.add(build);
            }
            topicData.setPartitions(arrayList2);
            arrayList.add(topicData);
        }
        AppendRecordsRequestData appendRecordsRequestData = new AppendRecordsRequestData();
        appendRecordsRequestData.setReplicaEpoch(this.destinationBrokerEpoch);
        appendRecordsRequestData.setTopics(arrayList);
        if (this.totalSizeInBytes > 0) {
            this.pushReplicationManagerMetrics.recordReplicationBytesOut(this.totalSizeInBytes);
        }
        return appendRecordsRequestData;
    }

    public void clear() {
        Iterator<Map<Integer, BufferingPartitionDataBuilder>> it = this.topicData.values().iterator();
        while (it.hasNext()) {
            Iterator<BufferingPartitionDataBuilder> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                handleEvictedRecords(it2.next().discardState());
            }
        }
    }

    public long destinationBrokerEpoch() {
        return this.destinationBrokerEpoch;
    }

    public int destinationBrokerId() {
        return this.destinationBrokerId;
    }

    public boolean isRequestReady() {
        if (this.topicData.isEmpty()) {
            return false;
        }
        return hasBufferedRecords() ? this.time.hiResClockMs() - this.recordsAddedTimeMs >= this.lingerMs : this.time.hiResClockMs() - this.creationTimeMs >= this.maxWaitMs;
    }

    private void removePartitionEntry(Uuid uuid, int i) {
        Map<Integer, BufferingPartitionDataBuilder> orDefault = this.topicData.getOrDefault(uuid, Collections.emptyMap());
        orDefault.remove(Integer.valueOf(i));
        if (orDefault.isEmpty()) {
            this.topicData.remove(uuid);
        }
    }

    private void handleEvictedRecords(List<AbstractRecords> list) {
        Iterator<AbstractRecords> it = list.iterator();
        while (it.hasNext()) {
            MemoryRecords memoryRecords = (AbstractRecords) it.next();
            if (memoryRecords instanceof MemoryRecords) {
                this.totalSizeInBytes -= r0.sizeInBytes();
                this.tracker.countDown(memoryRecords);
            }
        }
        if (hasBufferedRecords()) {
            return;
        }
        this.recordsAddedTimeMs = -1L;
    }

    private boolean hasBufferedRecords() {
        return this.totalSizeInBytes > 0;
    }
}
