/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.apache.flink.util.OptionalFailure;
import org.junit.Assert;

public abstract class AbstractTestCoordinationRequestHandler<T>
implements CoordinationRequestHandler {
    protected static final int BATCH_SIZE = 3;
    protected final TypeSerializer<T> serializer;
    protected final String accumulatorName;
    protected LinkedList<T> buffered;
    protected String version;
    protected long offset;
    protected long checkpointedOffset;
    private final Map<String, OptionalFailure<Object>> accumulatorResults;
    protected final Random random;
    protected boolean closed;

    public AbstractTestCoordinationRequestHandler(TypeSerializer<T> serializer, String accumulatorName) {
        this.serializer = serializer;
        this.accumulatorName = accumulatorName;
        this.buffered = new LinkedList();
        this.version = UUID.randomUUID().toString();
        this.offset = 0L;
        this.checkpointedOffset = 0L;
        this.accumulatorResults = new HashMap<String, OptionalFailure<Object>>();
        this.random = new Random();
        this.closed = false;
    }

    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        if (this.closed) {
            throw new RuntimeException("Handler closed");
        }
        Assert.assertTrue((boolean)(request instanceof CollectCoordinationRequest));
        CollectCoordinationRequest collectRequest = (CollectCoordinationRequest)request;
        this.updateBufferedResults();
        Assert.assertTrue((this.offset <= collectRequest.getOffset() ? 1 : 0) != 0);
        List subList = Collections.emptyList();
        if (collectRequest.getVersion().equals(this.version)) {
            while (this.buffered.size() > 0 && collectRequest.getOffset() > this.offset) {
                this.buffered.removeFirst();
                ++this.offset;
            }
            subList = new ArrayList();
            Iterator iterator = this.buffered.iterator();
            for (int i = 0; i < 3 && iterator.hasNext(); ++i) {
                subList.add(iterator.next());
            }
        }
        List<byte[]> nextBatch = CollectTestUtils.toBytesList(subList, this.serializer);
        CollectCoordinationResponse response = this.random.nextBoolean() ? new CollectCoordinationResponse(this.version, this.checkpointedOffset, nextBatch) : new CollectCoordinationResponse(collectRequest.getVersion(), -1L, Collections.emptyList());
        return CompletableFuture.completedFuture(response);
    }

    protected abstract void updateBufferedResults();

    public boolean isClosed() {
        return this.closed;
    }

    public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
        return this.accumulatorResults;
    }

    protected void buildAccumulatorResults() {
        List<byte[]> finalResults = CollectTestUtils.toBytesList(this.buffered, this.serializer);
        SerializedListAccumulator listAccumulator = new SerializedListAccumulator();
        try {
            byte[] serializedResult = CollectSinkFunction.serializeAccumulatorResult((long)this.offset, (String)this.version, (long)this.checkpointedOffset, finalResults);
            listAccumulator.add((Object)serializedResult, (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.accumulatorResults.put(this.accumulatorName, (OptionalFailure<Object>)OptionalFailure.of((Object)listAccumulator.getLocalValue()));
    }
}

