/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkAddressEvent;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
import org.apache.flink.streaming.api.operators.collect.SocketConnection;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.Test;

class CollectSinkOperatorCoordinatorTest {
    private static final TypeSerializer<Row> serializer = new RowTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}).createSerializer(new ExecutionConfig());

    CollectSinkOperatorCoordinatorTest() {
    }

    @Test
    void testNoAddress() throws Exception {
        try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();){
            coordinator.start();
            String requestVersion = "version";
            CompletableFuture response = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("version"));
            CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(response, "version");
        }
    }

    @Test
    void testClosingTheCoordinatorAfterRequestWasReceivedBySinkFunction() throws Exception {
        try (TestingSinkFunction sinkFunction = new TestingSinkFunction();){
            String expectedVersion = "version";
            CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
            coordinator.start();
            sinkFunction.registerSinkFunctionWith(coordinator);
            CompletableFuture<Void> sinkFunctionProcessing = sinkFunction.handleRequestWithoutResponse();
            ((CompletableFutureAssert)Assertions.assertThat(sinkFunctionProcessing).as("The SocketServer waits for the request to be sent.", new Object[0])).isNotDone();
            CompletableFuture responseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("version"));
            ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)responseFuture).as("The response shouldn't complete because the SinkFunction doesn't send any response.", new Object[0])).isNotDone();
            ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(sinkFunctionProcessing).as("The SocketServer should eventually have handled the request without sending a response back.", new Object[0])).eventuallySucceeds();
            coordinator.close();
            CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(responseFuture, "version");
        }
    }

    @Test
    void testSuccessfulResponse() throws Exception {
        try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
             TestingSinkFunction sinkFunction = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);){
            coordinator.start();
            List<Row> expectedData = Arrays.asList(Row.of((Object[])new Object[]{1, "aaa"}), Row.of((Object[])new Object[]{2, "bbb"}));
            CompletableFuture responseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForSinkFunctionGeneratedResponse());
            Assertions.assertThat((CompletableFuture)responseFuture).isNotDone();
            sinkFunction.handleRequest(expectedData);
            CollectSinkOperatorCoordinatorTest.assertResponseWithDefaultMetadataFromSinkFunction(responseFuture, expectedData);
        }
    }

    @Test
    void testClosingTheServerSocketOfTheSinkFunction() throws Exception {
        try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();){
            coordinator.start();
            TestingSinkFunction sinkFunction = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);
            String version = "version";
            CompletableFuture responseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("version"));
            Assertions.assertThat((CompletableFuture)responseFuture).isNotDone();
            FlinkAssertions.assertThatFuture(sinkFunction.handleRequestWithoutResponse()).eventuallySucceeds();
            sinkFunction.close();
            CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromSinkFunction(responseFuture);
        }
    }

    @Test
    void testClosingTheListeningSocketInTheSinkFunction() throws Exception {
        try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();){
            coordinator.start();
            try (TestingSinkFunction sinkFunction = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);){
                String version = "version";
                CompletableFuture responseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("version"));
                Assertions.assertThat((CompletableFuture)responseFuture).isNotDone();
                sinkFunction.handleRequestWithoutResponse();
                sinkFunction.waitForConnectionToBeEstablished();
                sinkFunction.closeAcceptingSocket();
                CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(responseFuture, "version");
            }
        }
    }

    @Test
    void testCoordinatorNotConnectingToTheSinkFunctionSocket() throws Exception {
        try (TestingSinkFunction sinkFunction = TestingSinkFunction.createTestingSinkFunctionWithoutConnection();){
            CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
            coordinator.start();
            sinkFunction.registerSinkFunctionWith(coordinator);
            String expectedVersion = "version";
            CompletableFuture responseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("version"));
            Assertions.assertThat((CompletableFuture)responseFuture).isNotDone();
            coordinator.close();
            CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(responseFuture, "version");
        }
    }

    @Test
    void testReconnectAfterSinkFunctionSocketDisconnect() throws Exception {
        try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();){
            coordinator.start();
            TestingSinkFunction sinkFunction = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);
            String expectedVersion = "version";
            CompletableFuture responseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("version"));
            sinkFunction.waitForConnectionToBeEstablished();
            coordinator.executionAttemptFailed(0, 0, null);
            sinkFunction.closeAcceptingSocket();
            CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(responseFuture, "version");
            String anotherVersion = "another-version";
            CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("another-version")), "another-version");
            TestingSinkFunction anotherSinkFunction = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);
            CompletableFuture anotherResponseFuture = coordinator.handleCoordinationRequest(CollectSinkOperatorCoordinatorTest.createRequestForSinkFunctionGeneratedResponse());
            List<Row> expectedData = Arrays.asList(Row.of((Object[])new Object[]{1, "aaa"}), Row.of((Object[])new Object[]{2, "bbb"}));
            anotherSinkFunction.handleRequest(expectedData);
            CollectSinkOperatorCoordinatorTest.assertResponseWithDefaultMetadataFromSinkFunction(anotherResponseFuture, expectedData);
            anotherSinkFunction.close();
        }
    }

    private static CoordinationRequest createRequestForSinkFunctionGeneratedResponse() {
        String unusedVersion = "random-version";
        return CollectSinkOperatorCoordinatorTest.createRequestForCoordinatorGeneratedResponse("random-version");
    }

    private static CoordinationRequest createRequestForCoordinatorGeneratedResponse(String version) {
        int unusedOffset = 123;
        return new CollectCoordinationRequest(version, 123L);
    }

    private static void assertEmptyResponseGeneratedFromSinkFunction(CompletableFuture<CoordinationResponse> responseFuture) throws Exception {
        CollectSinkOperatorCoordinatorTest.assertEmptyResponseGeneratedFromCoordinator(responseFuture, "version");
    }

    private static void assertEmptyResponseGeneratedFromCoordinator(CompletableFuture<CoordinationResponse> responseFuture, String expectedVersion) throws Exception {
        CollectSinkOperatorCoordinatorTest.assertResponse(responseFuture, expectedVersion, -1L, Collections.emptyList());
    }

    private static void assertResponseWithDefaultMetadataFromSinkFunction(CompletableFuture<CoordinationResponse> responseFuture, List<Row> expectedData) throws Exception {
        CollectSinkOperatorCoordinatorTest.assertResponse(responseFuture, "version", 2L, expectedData);
    }

    private static void assertResponse(CompletableFuture<CoordinationResponse> responseFuture, String expectedVersion, long expectedOffset, List<Row> expectedResults) throws Exception {
        CollectCoordinationResponse response = (CollectCoordinationResponse)responseFuture.get();
        Assertions.assertThat((String)response.getVersion()).isEqualTo(expectedVersion);
        Assertions.assertThat((long)response.getLastCheckpointedOffset()).isEqualTo(expectedOffset);
        List actualResult = response.getResults(serializer);
        Assertions.assertThat((List)actualResult).hasSize(expectedResults.size());
        for (int rowId = 0; rowId < actualResult.size(); ++rowId) {
            Row expectedRow = expectedResults.get(rowId);
            Row actualRow = (Row)actualResult.get(rowId);
            Assertions.assertThat((int)actualRow.getArity()).isEqualTo(expectedRow.getArity());
            for (int columnId = 0; columnId < actualRow.getArity(); ++columnId) {
                Assertions.assertThat((Object)actualRow.getField(columnId)).isEqualTo(expectedRow.getField(columnId));
            }
        }
    }

    private static class TestingSinkFunction
    implements AutoCloseable {
        static final String DEFAULT_SINK_FUNCTION_RESPONSE_VERSION = "version";
        static final int DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET = 2;
        private final ServerSocket serverSocket = new ServerSocket(0);
        @Nullable
        private CompletableFuture<SocketConnection> connectionFuture;

        public static TestingSinkFunction createSinkFunctionAndInitializeCoordinator(CollectSinkOperatorCoordinator coordinator) throws Exception {
            TestingSinkFunction socketServer = new TestingSinkFunction();
            socketServer.registerSinkFunctionWith(coordinator);
            return socketServer;
        }

        public static TestingSinkFunction createTestingSinkFunctionWithoutConnection() throws IOException {
            return new TestingSinkFunction(ignoredServerSocket -> null);
        }

        public TestingSinkFunction() throws IOException {
            this(TestingSinkFunction::acceptSocketAsync);
        }

        private TestingSinkFunction(Function<ServerSocket, CompletableFuture<SocketConnection>> socketListenerFactory) throws IOException {
            this.connectionFuture = socketListenerFactory.apply(this.serverSocket);
        }

        private static CompletableFuture<SocketConnection> acceptSocketAsync(ServerSocket serverSocket) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return new SocketConnection(NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket));
                }
                catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }

        private CompletableFuture<SocketConnection> getConnectionFuture() {
            Preconditions.checkState((this.connectionFuture != null ? 1 : 0) != 0, (Object)"The accepting Socket is already closed. The calling operation is not possible anymore.");
            return this.connectionFuture;
        }

        private InetSocketAddress getSocketAddress() {
            return new InetSocketAddress(InetAddress.getLoopbackAddress(), this.serverSocket.getLocalPort());
        }

        public void registerSinkFunctionWith(CollectSinkOperatorCoordinator coordinator) throws Exception {
            coordinator.handleEventFromOperator(0, 0, (OperatorEvent)new CollectSinkAddressEvent(this.getSocketAddress()));
        }

        @Override
        public void close() throws Exception {
            this.closeAcceptingSocket();
            this.serverSocket.close();
        }

        public void closeAcceptingSocket() throws Exception {
            if (this.connectionFuture != null) {
                this.connectionFuture.get().close();
                this.connectionFuture = null;
            }
        }

        public void waitForConnectionToBeEstablished() throws ExecutionException, InterruptedException {
            this.getConnectionFuture().get();
        }

        public void handleRequest(List<Row> actualData) {
            this.handleRequest(DEFAULT_SINK_FUNCTION_RESPONSE_VERSION, 2, actualData);
        }

        public void handleRequest(String actualVersion, int actualOffset, List<Row> actualData) {
            this.handleRequestAsync(actualVersion, actualOffset, CompletableFuture.completedFuture(actualData)).join();
        }

        public CompletableFuture<Void> handleRequestWithoutResponse() {
            return this.internalConnectWithRequestHandlingAsync().thenApply(socketConnection -> null);
        }

        private CompletableFuture<SocketConnection> internalConnectWithRequestHandlingAsync() {
            return this.getConnectionFuture().thenApply(socketConnection -> {
                try {
                    new CollectCoordinationRequest(socketConnection.getDataInputView());
                }
                catch (IOException e) {
                    throw new CompletionException(e);
                }
                return socketConnection;
            });
        }

        public CompletableFuture<Void> handleRequestAsync(String actualVersion, int actualOffset, CompletableFuture<List<Row>> actualDataAsync) {
            return this.internalConnectWithRequestHandlingAsync().thenCombineAsync(actualDataAsync, (socketConnection, data) -> {
                if (socketConnection == null) {
                    throw new CompletionException(new IllegalStateException("No SocketConnection established."));
                }
                try {
                    new CollectCoordinationResponse(actualVersion, (long)actualOffset, CollectTestUtils.toBytesList(data, serializer)).serialize(socketConnection.getDataOutputView());
                }
                catch (IOException e) {
                    throw new CompletionException(e);
                }
                return null;
            });
        }
    }
}

