/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.rest;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.rest.InternalRestServer;
import io.confluent.rest.KafkaRestorePartitionHandle;
import io.confluent.rest.KafkaRestoreRestApiReturnStatus;
import io.confluent.rest.ResponseContainer;
import io.confluent.rest.TierPartitionStateResponse;
import io.confluent.rest.TierRecordMetadataResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Optional;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestoreHandler
extends Handler.Abstract {
    private static final Logger log = LoggerFactory.getLogger(InternalRestServer.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final KafkaRestorePartitionHandle kafkaRestorePartitionHandle;
    private static final int BUFFER_SIZE = 4096;

    public RestoreHandler(KafkaRestorePartitionHandle restoreHandle) {
        this.kafkaRestorePartitionHandle = restoreHandle;
    }

    private static ResponseContainer.ErrorResponse genericErrorResponse(String message) {
        return new ResponseContainer.ErrorResponse(0, 500, message);
    }

    public boolean handle(Request request, Response response, Callback callback) throws Exception {
        String target = request.getHttpURI().getPath();
        log.debug("Handling " + String.valueOf(request.getHttpURI()));
        if (target.endsWith("/leader-replica")) {
            this.handleLeaderReplicaQuery(request, response);
        } else if (target.endsWith("/tier-state")) {
            this.handleTierStateQuery(request, response);
        } else if (target.endsWith("/ftps")) {
            this.handleFtpsDownload(request, response);
        } else if (target.endsWith("/fence")) {
            this.handleFenceTierPartitionState(request, response);
        } else if (target.endsWith("/restore")) {
            this.handleRestoreTierPartitionState(request, response);
        } else if (target.endsWith("/unfreeze")) {
            this.handleUnfreezeTierPartition(request, response);
        } else if (target.endsWith("/validate-log-range")) {
            this.handleValidateLogRange(request, response);
        } else if (target.endsWith("/pre-checks")) {
            this.handlePreChecksForRestore(request, response);
        } else {
            log.error("Not supported URI: " + String.valueOf(request.getHttpURI()));
        }
        return true;
    }

    private void handleLeaderReplicaQuery(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            LeaderReplicaRequest leaderReplicaRequest = (LeaderReplicaRequest)OBJECT_MAPPER.readValue(inputStream, LeaderReplicaRequest.class);
            int brokerId = this.kafkaRestorePartitionHandle.leaderReplica(leaderReplicaRequest.topicName, leaderReplicaRequest.partition);
            if (brokerId == KafkaRestoreRestApiReturnStatus.invalid) {
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse("Failed to retrieve leader for partition");
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            } else {
                LeaderReplicaResponse leaderReplicaResponse = new LeaderReplicaResponse(brokerId);
                ResponseContainer.dataResponse(leaderReplicaResponse).write(OBJECT_MAPPER, response);
            }
            ResponseContainer.dataResponse(brokerId).write(OBJECT_MAPPER, response);
        }
        catch (Exception e) {
            String errorMessage = "Failed to retrieve leader for partition";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handleTierStateQuery(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            TierPartitionStatusRequest tierPartitionStateRequest = (TierPartitionStatusRequest)OBJECT_MAPPER.readValue(inputStream, TierPartitionStatusRequest.class);
            TierPartitionStateResponse status = this.kafkaRestorePartitionHandle.tierPartitionStatus(tierPartitionStateRequest.topicName, tierPartitionStateRequest.partition);
            if (status == null) {
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse("Failed to retrieve tier partition status");
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            } else {
                ResponseContainer.dataResponse(status).write(OBJECT_MAPPER, response);
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed to retrieve tier partition state for partition";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handleFtpsDownload(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            FtpsDownloadRequest ftpsDownloadRequest = (FtpsDownloadRequest)OBJECT_MAPPER.readValue(inputStream, FtpsDownloadRequest.class);
            log.debug(String.format("input request: %s", ftpsDownloadRequest));
            Optional<File> ftpsFile = this.kafkaRestorePartitionHandle.ftpsFile(ftpsDownloadRequest.topicName, ftpsDownloadRequest.partition);
            if (!ftpsFile.isPresent()) {
                String errorMessage = String.format("Ftps file not found for partition: %s", ftpsDownloadRequest);
                log.error(errorMessage);
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
                return;
            }
            response.getHeaders().put(HttpHeader.CONTENT_TYPE, "application/octet-stream");
            response.getHeaders().put(HttpHeader.CONTENT_LENGTH, (long)((int)ftpsFile.get().length()));
            response.getHeaders().put(HttpHeader.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", ftpsFile.get().getName()));
            try (FileInputStream in = new FileInputStream(ftpsFile.get());
                 OutputStream out = Content.Sink.asOutputStream((Content.Sink)response);){
                int numBytesRead;
                byte[] buffer = new byte[4096];
                while ((numBytesRead = ((InputStream)in).read(buffer)) > 0) {
                    out.write(buffer, 0, numBytesRead);
                }
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed to handle ftps file download";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handleFenceTierPartitionState(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            TierPartitionFenceRequest tierPartitionFenceRequest = (TierPartitionFenceRequest)OBJECT_MAPPER.readValue(inputStream, TierPartitionFenceRequest.class);
            TierRecordMetadataResponse recordMetadata = this.kafkaRestorePartitionHandle.setFenceTierTopicPartition(tierPartitionFenceRequest.topicName, tierPartitionFenceRequest.partition);
            if (recordMetadata == null) {
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse("Failed to fence topic partition" + tierPartitionFenceRequest.topicName + "_" + tierPartitionFenceRequest.partition);
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            } else {
                ResponseContainer.dataResponse(recordMetadata).write(OBJECT_MAPPER, response);
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed to send fence tier partition event for partition";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handleRestoreTierPartitionState(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            TierPartitionRestoreRequest tierPartitionRestoreRequest = (TierPartitionRestoreRequest)OBJECT_MAPPER.readValue(inputStream, TierPartitionRestoreRequest.class);
            TierRecordMetadataResponse recordMetadata = this.kafkaRestorePartitionHandle.setForceRestoreTierPartition(tierPartitionRestoreRequest.topicName, tierPartitionRestoreRequest.partition, tierPartitionRestoreRequest.logStartOffset, tierPartitionRestoreRequest.logEndOffset, tierPartitionRestoreRequest.contentHash, tierPartitionRestoreRequest.checksumAlgorithm);
            if (recordMetadata == null) {
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse("Failed to restore topic partition" + tierPartitionRestoreRequest.topicName + "_" + tierPartitionRestoreRequest.partition);
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            } else {
                ResponseContainer.dataResponse(recordMetadata).write(OBJECT_MAPPER, response);
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed to send restore tier partition event for partition";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handleUnfreezeTierPartition(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            TierPartitionUnfreezeRequest tierPartitionUnfreezeRequest = (TierPartitionUnfreezeRequest)OBJECT_MAPPER.readValue(inputStream, TierPartitionUnfreezeRequest.class);
            TierRecordMetadataResponse recordMetadata = this.kafkaRestorePartitionHandle.setUnfreezeLogStartOffset(tierPartitionUnfreezeRequest.topicName, tierPartitionUnfreezeRequest.partition);
            if (recordMetadata == null) {
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse("Failed to unfreeze topic partition" + tierPartitionUnfreezeRequest.topicName + "_" + tierPartitionUnfreezeRequest.partition);
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            } else {
                ResponseContainer.dataResponse(recordMetadata).write(OBJECT_MAPPER, response);
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed to send unfreeze tier partition event for partition";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handleValidateLogRange(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            ValidateLogRangeRequest validateLogRangeRequest = (ValidateLogRangeRequest)OBJECT_MAPPER.readValue(inputStream, ValidateLogRangeRequest.class);
            int status = this.kafkaRestorePartitionHandle.validateLogRange(validateLogRangeRequest.topicName, validateLogRangeRequest.partition, validateLogRangeRequest.logStartOffset, validateLogRangeRequest.logEndOffset);
            if (status == KafkaRestoreRestApiReturnStatus.success) {
                TierPartitionStateSetResponse cmdResponse = new TierPartitionStateSetResponse(status);
                ResponseContainer.dataResponse(cmdResponse).write(OBJECT_MAPPER, response);
            } else {
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse("Failed to validate log range");
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed to validate log range";
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    private void handlePreChecksForRestore(Request request, Response response) throws IOException {
        try (InputStream inputStream = Content.Source.asInputStream((Content.Source)request);){
            PreChecksRequest preChecksRequest = (PreChecksRequest)OBJECT_MAPPER.readValue(inputStream, PreChecksRequest.class);
            int status = this.kafkaRestorePartitionHandle.preChecksForRestore(preChecksRequest.topicName, preChecksRequest.partition);
            if (status == KafkaRestoreRestApiReturnStatus.success) {
                TierPartitionStateSetResponse cmdResponse = new TierPartitionStateSetResponse(status);
                ResponseContainer.dataResponse(cmdResponse).write(OBJECT_MAPPER, response);
                log.info("[" + preChecksRequest.topicName + "-" + preChecksRequest.partition + "]:  pre-check passed");
            } else {
                String message = "[" + preChecksRequest.topicName + "-" + preChecksRequest.partition + "]:  pre-check failed";
                log.info(message);
                ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(message);
                ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
            }
        }
        catch (Exception e) {
            String errorMessage = "Failed at restore pre-check: " + e.getMessage();
            log.error(errorMessage, (Throwable)e);
            ResponseContainer.ErrorResponse resp = RestoreHandler.genericErrorResponse(errorMessage);
            ResponseContainer.errorResponse(Collections.singletonList(resp)).write(OBJECT_MAPPER, response);
        }
    }

    static final class LeaderReplicaRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;

        @JsonCreator
        public LeaderReplicaRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition) {
            this.topicName = topicName;
            this.partition = partition;
        }

        public String toString() {
            return "LeaderReplicaRequest{topic_name=" + this.topicName + ", partition=" + this.partition + "}";
        }
    }

    static final class LeaderReplicaResponse {
        @JsonProperty(value="broker")
        final int broker;

        @JsonCreator
        public LeaderReplicaResponse(@JsonProperty(value="broker", required=true) int broker) {
            this.broker = broker;
        }

        public String toString() {
            return "LeaderReplicaResponse{broker=" + this.broker + "}";
        }
    }

    static final class TierPartitionStatusRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;

        @JsonCreator
        public TierPartitionStatusRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition) {
            this.topicName = topicName;
            this.partition = partition;
        }

        public String toString() {
            return "TierPartitionStateRequest{topic_name=" + this.topicName + ", partition=" + this.partition + "}";
        }
    }

    static final class FtpsDownloadRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;

        @JsonCreator
        public FtpsDownloadRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition) {
            this.topicName = topicName;
            this.partition = partition;
        }

        public String toString() {
            return "FtpsDownloadRequest{topic_name=" + this.topicName + ", partition=" + this.partition + "}";
        }
    }

    static final class TierPartitionFenceRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;

        @JsonCreator
        public TierPartitionFenceRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition) {
            this.topicName = topicName;
            this.partition = partition;
        }

        public String toString() {
            return "TierPartitionFence{topic_name=" + this.topicName + ", partition=" + this.partition + "}";
        }
    }

    static final class TierPartitionRestoreRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;
        @JsonProperty(value="log_start_offset")
        final long logStartOffset;
        @JsonProperty(value="log_end_offset")
        final long logEndOffset;
        @JsonProperty(value="content_hash")
        final String contentHash;
        @JsonProperty(value="checksum_algorithm")
        final byte checksumAlgorithm;

        @JsonCreator
        public TierPartitionRestoreRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition, @JsonProperty(value="log_start_offset", required=true) long logStartOffset, @JsonProperty(value="log_end_offset", required=true) long logEndOffset, @JsonProperty(value="content_hash", required=true) String contentHash, @JsonProperty(value="checksum_algorithm", required=true) byte checksumAlgorithm) {
            this.topicName = topicName;
            this.partition = partition;
            this.contentHash = contentHash;
            this.logStartOffset = logStartOffset;
            this.logEndOffset = logEndOffset;
            this.checksumAlgorithm = checksumAlgorithm;
        }

        public String toString() {
            return "TierPartitionRestoreRequest{topic_name=" + this.topicName + ", partition=" + this.partition + ", log_start_offset=" + this.logStartOffset + ", log_end_offset=" + this.logEndOffset + ", content_hash=" + this.contentHash + ", checksum_algorithm=" + this.checksumAlgorithm + "}";
        }
    }

    static final class TierPartitionUnfreezeRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;

        @JsonCreator
        public TierPartitionUnfreezeRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition) {
            this.topicName = topicName;
            this.partition = partition;
        }

        public String toString() {
            return "TierPartitionUnfreezeRequest{topic_name=" + this.topicName + ", partition=" + this.partition + "}";
        }
    }

    static final class ValidateLogRangeRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;
        @JsonProperty(value="log_start_offset")
        final long logStartOffset;
        @JsonProperty(value="log_end_offset")
        final long logEndOffset;

        @JsonCreator
        public ValidateLogRangeRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition, @JsonProperty(value="log_start_offset", required=true) long logStartOffset, @JsonProperty(value="log_end_offset", required=true) long logEndOffset) {
            this.topicName = topicName;
            this.partition = partition;
            this.logStartOffset = logStartOffset;
            this.logEndOffset = logEndOffset;
        }

        public String toString() {
            return "ValidateLogRangeRequest{topic_name=" + this.topicName + ", partition=" + this.partition + ", log_start_offset=" + this.logStartOffset + ", log_end_offset=" + this.logEndOffset + "}";
        }
    }

    static final class TierPartitionStateSetResponse {
        @JsonProperty(value="response")
        final int response;

        @JsonCreator
        public TierPartitionStateSetResponse(@JsonProperty(value="response", required=true) int response) {
            this.response = response;
        }

        public String toString() {
            return "TierPartitionStateSetResponse{response=" + this.response + "}";
        }
    }

    static final class PreChecksRequest {
        @JsonProperty(value="topic_name")
        final String topicName;
        @JsonProperty(value="partition")
        final int partition;

        @JsonCreator
        public PreChecksRequest(@JsonProperty(value="topic_name", required=true) String topicName, @JsonProperty(value="partition", required=true) int partition) {
            this.topicName = topicName;
            this.partition = partition;
        }

        public String toString() {
            return "PreChecksRequest{topic_name=" + this.topicName + ", partition=" + this.partition + "}";
        }
    }
}

