package org.apache.kafka.server.share.persister;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.server.share.persister.PersisterStateManager;
import org.apache.kafka.server.share.persister.ReadShareGroupStateResult;
import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/share/persister/DefaultStatePersister.class */
public class DefaultStatePersister implements Persister {
    private final PersisterStateManager stateManager;
    private static final Logger log = LoggerFactory.getLogger(DefaultStatePersister.class);

    public DefaultStatePersister(PersisterStateManager persisterStateManager) {
        this.stateManager = persisterStateManager;
        this.stateManager.start();
    }

    @Override // org.apache.kafka.server.share.persister.Persister
    public void stop() {
        try {
            if (this.stateManager != null) {
                this.stateManager.stop();
            }
        } catch (Exception e) {
            log.error("Unable to stop state manager", e);
        }
    }

    @Override // org.apache.kafka.server.share.persister.Persister
    public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters initializeShareGroupStateParameters) {
        throw new RuntimeException("not implemented");
    }

    @Override // org.apache.kafka.server.share.persister.Persister
    public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters writeShareGroupStateParameters) {
        try {
            validate(writeShareGroupStateParameters);
            GroupTopicPartitionData<PartitionStateBatchData> groupTopicPartitionData = writeShareGroupStateParameters.groupTopicPartitionData();
            String groupId = groupTopicPartitionData.groupId();
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            groupTopicPartitionData.topicsData().forEach(topicData -> {
                topicData.partitions().forEach(partitionStateBatchData -> {
                    CompletableFuture completableFuture = (CompletableFuture) ((Map) hashMap.computeIfAbsent(topicData.topicId(), uuid -> {
                        return new HashMap();
                    })).computeIfAbsent(Integer.valueOf(partitionStateBatchData.partition()), num -> {
                        return new CompletableFuture();
                    });
                    PersisterStateManager persisterStateManager = this.stateManager;
                    Objects.requireNonNull(persisterStateManager);
                    arrayList.add(new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicData.topicId(), partitionStateBatchData.partition(), partitionStateBatchData.stateEpoch(), partitionStateBatchData.leaderEpoch(), partitionStateBatchData.startOffset(), partitionStateBatchData.stateBatches(), completableFuture, null));
                });
            });
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler) it.next());
            }
            return CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map((v0) -> {
                return v0.result();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).thenApply(r5 -> {
                return writeResponsesToResult(hashMap);
            });
        } catch (Exception e) {
            log.error("Unable to validate write state request", e);
            return CompletableFuture.failedFuture(e);
        }
    }

    WriteShareGroupStateResult writeResponsesToResult(Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponse>>> map) {
        return new WriteShareGroupStateResult.Builder().setTopicsData((List) map.keySet().stream().map(uuid -> {
            return new TopicData(uuid, (List) ((Map) map.get(uuid)).entrySet().stream().map(entry -> {
                int intValue = ((Integer) entry.getKey()).intValue();
                try {
                    return (List) ((WriteShareGroupStateResponseData.WriteStateResult) ((WriteShareGroupStateResponse) ((CompletableFuture) entry.getValue()).join()).data().results().get(0)).partitions().stream().map(partitionResult -> {
                        return PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage());
                    }).collect(Collectors.toList());
                } catch (Exception e) {
                    log.error("Unexpected exception while writing data to share coordinator", e);
                    return Collections.singletonList(PartitionFactory.newPartitionErrorData(intValue, Errors.UNKNOWN_SERVER_ERROR.code(), "Error writing state to share coordinator: " + e.getMessage()));
                }
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
        }).collect(Collectors.toList())).build();
    }

    @Override // org.apache.kafka.server.share.persister.Persister
    public CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters readShareGroupStateParameters) {
        try {
            validate(readShareGroupStateParameters);
            GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData = readShareGroupStateParameters.groupTopicPartitionData();
            String groupId = groupTopicPartitionData.groupId();
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            groupTopicPartitionData.topicsData().forEach(topicData -> {
                topicData.partitions().forEach(partitionIdLeaderEpochData -> {
                    CompletableFuture completableFuture = (CompletableFuture) ((Map) hashMap.computeIfAbsent(topicData.topicId(), uuid -> {
                        return new HashMap();
                    })).computeIfAbsent(Integer.valueOf(partitionIdLeaderEpochData.partition()), num -> {
                        return new CompletableFuture();
                    });
                    PersisterStateManager persisterStateManager = this.stateManager;
                    Objects.requireNonNull(persisterStateManager);
                    arrayList.add(new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicData.topicId(), partitionIdLeaderEpochData.partition(), partitionIdLeaderEpochData.leaderEpoch(), completableFuture, null));
                });
            });
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler) it.next());
            }
            return CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map((v0) -> {
                return v0.result();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).thenApply(r5 -> {
                return readResponsesToResult(hashMap);
            });
        } catch (Exception e) {
            log.error("Unable to validate read state request", e);
            return CompletableFuture.failedFuture(e);
        }
    }

    ReadShareGroupStateResult readResponsesToResult(Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateResponse>>> map) {
        return new ReadShareGroupStateResult.Builder().setTopicsData((List) map.keySet().stream().map(uuid -> {
            return new TopicData(uuid, (List) ((Map) map.get(uuid)).entrySet().stream().map(entry -> {
                int intValue = ((Integer) entry.getKey()).intValue();
                try {
                    return (List) ((ReadShareGroupStateResponseData.ReadStateResult) ((ReadShareGroupStateResponse) ((CompletableFuture) entry.getValue()).join()).data().results().get(0)).partitions().stream().map(partitionResult -> {
                        return PartitionFactory.newPartitionAllData(partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage(), (List) partitionResult.stateBatches().stream().map(PersisterStateBatch::from).collect(Collectors.toList()));
                    }).collect(Collectors.toList());
                } catch (Exception e) {
                    log.error("Unexpected exception while getting data from share coordinator", e);
                    return Collections.singletonList(PartitionFactory.newPartitionAllData(intValue, -1, -1L, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: " + e.getMessage(), Collections.emptyList()));
                }
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
        }).collect(Collectors.toList())).build();
    }

    @Override // org.apache.kafka.server.share.persister.Persister
    public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGroupStateParameters deleteShareGroupStateParameters) {
        throw new RuntimeException("not implemented");
    }

    @Override // org.apache.kafka.server.share.persister.Persister
    public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters) {
        throw new RuntimeException("not implemented");
    }

    private static void validate(WriteShareGroupStateParameters writeShareGroupStateParameters) {
        if (writeShareGroupStateParameters == null) {
            throw new IllegalArgumentException("Write share group parameters" + " cannot be null.");
        }
        if (writeShareGroupStateParameters.groupTopicPartitionData() == null) {
            throw new IllegalArgumentException("Write share group parameters" + " data cannot be null.");
        }
        validateGroupTopicPartitionData("Write share group parameters", writeShareGroupStateParameters.groupTopicPartitionData());
    }

    private static void validate(ReadShareGroupStateParameters readShareGroupStateParameters) {
        if (readShareGroupStateParameters == null) {
            throw new IllegalArgumentException("Read share group parameters" + " cannot be null.");
        }
        if (readShareGroupStateParameters.groupTopicPartitionData() == null) {
            throw new IllegalArgumentException("Read share group parameters" + " data cannot be null.");
        }
        validateGroupTopicPartitionData("Read share group parameters", readShareGroupStateParameters.groupTopicPartitionData());
    }

    private static void validateGroupTopicPartitionData(String str, GroupTopicPartitionData<? extends PartitionIdData> groupTopicPartitionData) {
        String groupId = groupTopicPartitionData.groupId();
        if (groupId == null || groupId.isEmpty()) {
            throw new IllegalArgumentException(str + " groupId cannot be null or empty.");
        }
        List<TopicData<? extends PartitionIdData>> list = groupTopicPartitionData.topicsData();
        if (isEmpty(list)) {
            throw new IllegalArgumentException(str + " topics data cannot be null or empty.");
        }
        for (TopicData<? extends PartitionIdData> topicData : list) {
            if (topicData.topicId() == null) {
                throw new IllegalArgumentException(str + " topicId cannot be null.");
            }
            if (isEmpty(topicData.partitions())) {
                throw new IllegalArgumentException(str + " partitions cannot be null or empty.");
            }
            for (PartitionIdData partitionIdData : topicData.partitions()) {
                if (partitionIdData.partition() < 0) {
                    throw new IllegalArgumentException(String.format("%s has invalid partitionId - %s %s %d", str, groupId, topicData.topicId(), Integer.valueOf(partitionIdData.partition())));
                }
            }
        }
    }

    private static boolean isEmpty(List<?> list) {
        return list == null || list.isEmpty();
    }
}
