package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.WALFile;
import io.confluent.connect.storage.wal.FilePathOffset;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.CannotObtainBlockLengthException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/hdfs/wal/FSWAL.class */
public class FSWAL implements WAL {
    private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
    private static final String TRUNCATED_LOG_EXTENSION = ".1";
    private final HdfsSinkConnectorConfig conf;
    private final HdfsStorage storage;
    private final String logFile;
    protected WALFile.Writer writer = null;
    private WALFile.Reader reader = null;

    public FSWAL(String str, TopicPartition topicPartition, HdfsStorage hdfsStorage) throws ConnectException {
        this.storage = hdfsStorage;
        this.conf = hdfsStorage.m25conf();
        this.logFile = FileUtils.logFileName(hdfsStorage.url(), str, topicPartition);
    }

    public void append(String str, String str2) throws ConnectException {
        try {
            acquireLease();
            this.writer.append(new WALEntry(str), new WALEntry(str2));
            this.writer.hsync();
        } catch (IOException e) {
            log.error("Error appending WAL file: {}, {}", this.logFile, e);
            close();
            throw new DataException(e);
        }
    }

    public void acquireLease() throws ConnectException {
        long j;
        log.debug("Attempting to acquire lease for WAL file: {}", this.logFile);
        long j2 = 1000;
        while (true) {
            j = j2;
            if (j >= 16000) {
                break;
            }
            try {
                if (this.writer != null) {
                    break;
                }
                this.writer = WALFile.createWriter(this.conf, WALFile.Writer.file(new Path(this.logFile)), WALFile.Writer.appendIfExists(true));
                log.debug("Successfully acquired lease, {}-{}, file {}", new Object[]{this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile});
                break;
            } catch (IOException e) {
                throw new DataException(String.format("Error creating writer for log file, %s-%s, file %s", this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile), e);
            } catch (RemoteException e2) {
                if (!e2.getClassName().equals("org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException")) {
                    throw new ConnectException(e2);
                }
                log.warn("Cannot acquire lease on WAL, {}-{}, file {}", new Object[]{this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile});
                try {
                    Thread.sleep(j);
                    j2 = j * 2;
                } catch (InterruptedException e3) {
                    throw new ConnectException(e3);
                }
            }
        }
        if (j >= 16000) {
            throw new ConnectException("Cannot acquire lease after timeout, will retry.");
        }
    }

    public void apply() throws ConnectException {
        log.debug("Starting to apply WAL: {}", this.logFile);
        if (!this.storage.exists(this.logFile)) {
            log.debug("WAL file does not exist: {}", this.logFile);
            return;
        }
        acquireLease();
        log.debug("Lease acquired");
        try {
            if (this.reader == null) {
                this.reader = newWalFileReader(this.logFile);
            }
            commitWalEntriesToStorage();
        } catch (CannotObtainBlockLengthException e) {
            log.error("Error applying WAL file '{}' because the task cannot obtain the block length from HDFS: {}", this.logFile, e);
            log.warn("Truncating and skipping the WAL file '{}'.", this.logFile);
            close();
        } catch (CorruptWalFileException e2) {
            log.error("Error applying WAL file '{}' because it is corrupted: {}", this.logFile, e2);
            log.warn("Truncating and skipping corrupt WAL file '{}'.", this.logFile);
            close();
        } catch (IOException e3) {
            log.error("Error applying WAL file: {}, {}", this.logFile, e3);
            close();
            throw new DataException(e3);
        }
        log.debug("Finished applying WAL: {}", this.logFile);
    }

    private void commitWalEntriesToStorage() throws IOException {
        HashMap hashMap = new HashMap();
        WALEntry wALEntry = new WALEntry();
        WALEntry wALEntry2 = new WALEntry();
        while (this.reader.next(wALEntry, wALEntry2)) {
            String name = wALEntry.getName();
            if (name.equals("BEGIN")) {
                hashMap.clear();
            } else if (name.equals("END")) {
                commitEntriesToStorage(hashMap);
            } else {
                hashMap.put(new WALEntry(wALEntry.getName()), new WALEntry(wALEntry2.getName()));
            }
        }
    }

    private void commitEntriesToStorage(Map<WALEntry, WALEntry> map) {
        for (Map.Entry<WALEntry, WALEntry> entry : map.entrySet()) {
            String name = entry.getKey().getName();
            String name2 = entry.getValue().getName();
            if (!this.storage.exists(name2)) {
                this.storage.commit(name, name2);
            }
        }
    }

    public FilePathOffset extractLatestOffset() {
        String str = this.logFile + TRUNCATED_LOG_EXTENSION;
        try {
            FilePathOffset filePathOffset = null;
            if (this.storage.exists(this.logFile)) {
                log.trace("Restoring offset from WAL file: {}", this.logFile);
                if (this.reader == null) {
                    this.reader = newWalFileReader(this.logFile);
                } else {
                    this.reader.seekToFirstRecord();
                }
                filePathOffset = getLatestOffsetFromList(getLastFilledBlockFromWAL(this.reader));
            }
            if (filePathOffset == null && this.storage.exists(str)) {
                log.trace("Could not find offset in log file {}. Using {} instead", this.logFile, str);
                WALFile.Reader newWalFileReader = newWalFileReader(str);
                Throwable th = null;
                try {
                    filePathOffset = getLatestOffsetFromList(getLastFilledBlockFromWAL(newWalFileReader));
                    if (newWalFileReader != null) {
                        if (0 != 0) {
                            try {
                                newWalFileReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newWalFileReader.close();
                        }
                    }
                } finally {
                }
            }
            return filePathOffset;
        } catch (IOException e) {
            log.warn("Error restoring offsets from either {} or {} WAL files: {}", new Object[]{this.logFile, str, e.getMessage()});
            return null;
        }
    }

    private List<String> getLastFilledBlockFromWAL(WALFile.Reader reader) throws IOException {
        List<String> emptyList = Collections.emptyList();
        ArrayList arrayList = new ArrayList();
        WALEntry wALEntry = new WALEntry();
        WALEntry wALEntry2 = new WALEntry();
        boolean z = false;
        while (reader.next(wALEntry, wALEntry2)) {
            String name = wALEntry.getName();
            if (name.equals("BEGIN")) {
                arrayList.clear();
                z = true;
            } else if (name.equals("END")) {
                if (z && !arrayList.isEmpty()) {
                    emptyList = new ArrayList(arrayList);
                }
                arrayList.clear();
                z = false;
            } else if (z) {
                arrayList.add(wALEntry2.getName());
            }
        }
        if (z && !arrayList.isEmpty()) {
            log.warn("The last file block in the WAL is missing an END token");
        }
        return emptyList;
    }

    private FilePathOffset getLatestOffsetFromList(List<String> list) {
        FilePathOffset filePathOffset = null;
        long j = -1;
        for (String str : list) {
            long extractOffsetsFromFilePath = extractOffsetsFromFilePath(str);
            if (extractOffsetsFromFilePath > j) {
                j = extractOffsetsFromFilePath;
                filePathOffset = new FilePathOffset(j, str);
            }
        }
        return filePathOffset;
    }

    static long extractOffsetsFromFilePath(String str) {
        if (str == null) {
            return -1L;
        }
        try {
            return FileUtils.extractOffset(Paths.get(str, new String[0]).getFileName().toString());
        } catch (IllegalArgumentException e) {
            log.warn("Could not extract offsets from file path {}: {}", str, e.getMessage());
            return -1L;
        }
    }

    private WALFile.Reader newWalFileReader(String str) throws IOException {
        return new WALFile.Reader(this.conf.getHadoopConfiguration(), WALFile.Reader.file(new Path(str)));
    }

    public void truncate() throws ConnectException {
        try {
            if (this.storage.exists(this.logFile)) {
                log.debug("Truncating WAL file: {}", this.logFile);
                String str = this.logFile + TRUNCATED_LOG_EXTENSION;
                this.storage.delete(str);
                this.storage.commit(this.logFile, str);
            }
        } finally {
            close();
        }
    }

    public void close() throws ConnectException {
        log.debug("Closing WAL, {}-{}, file: {}", new Object[]{this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile});
        try {
            try {
                if (this.writer != null) {
                    this.writer.close();
                }
                if (this.reader != null) {
                    this.reader.close();
                }
            } catch (IOException e) {
                throw new DataException("Error closing " + this.logFile, e);
            }
        } finally {
            this.writer = null;
            this.reader = null;
        }
    }

    public String getLogFile() {
        return this.logFile;
    }
}
