package io.pravega.client.segment.impl;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.EndOfSegmentException;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.CircularBuffer;
import io.pravega.shared.protocol.netty.WireCommands;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/segment/impl/SegmentInputStreamImpl.class */
class SegmentInputStreamImpl implements SegmentInputStream {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentInputStreamImpl.class);

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock;
    static final int DEFAULT_BUFFER_SIZE = 1048576;
    private static final int DEFAULT_READ_LENGTH = 262144;
    private static final long UNBOUNDED_END_OFFSET = Long.MAX_VALUE;
    private final AsyncSegmentInputStream asyncInput;
    private final int minReadLength;

    @GuardedBy("$lock")
    private final CircularBuffer buffer;

    @GuardedBy("$lock")
    private long offset;

    @GuardedBy("$lock")
    private final long endOffset;

    @GuardedBy("$lock")
    private boolean receivedEndOfSegment;

    @GuardedBy("$lock")
    private boolean receivedTruncated;

    @GuardedBy("$lock")
    private CompletableFuture<WireCommands.SegmentRead> outstandingRequest;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentInputStreamImpl(AsyncSegmentInputStream asyncSegmentInputStream, long j) {
        this(asyncSegmentInputStream, j, UNBOUNDED_END_OFFSET, 1048576);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentInputStreamImpl(AsyncSegmentInputStream asyncSegmentInputStream, long j, long j2, int i) {
        this.$lock = new Object[0];
        this.receivedEndOfSegment = false;
        this.receivedTruncated = false;
        this.outstandingRequest = null;
        Preconditions.checkArgument(j >= 0);
        Preconditions.checkNotNull(asyncSegmentInputStream);
        Preconditions.checkNotNull(Long.valueOf(j2), "endOffset");
        Preconditions.checkArgument(j2 >= j, "Invalid end offset.");
        this.asyncInput = asyncSegmentInputStream;
        this.offset = j;
        this.endOffset = j2;
        this.minReadLength = Math.min(DEFAULT_READ_LENGTH, i);
        this.buffer = new CircularBuffer(i);
        issueRequestIfNeeded();
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public void setOffset(long j) {
        synchronized (this.$lock) {
            log.trace("SetOffset {}", Long.valueOf(j));
            Preconditions.checkArgument(j >= 0);
            Exceptions.checkNotClosed(this.asyncInput.isClosed(), this);
            if (j > this.offset) {
                this.receivedTruncated = false;
            }
            if (j != this.offset) {
                this.offset = j;
                this.buffer.clear();
                this.receivedEndOfSegment = false;
                this.outstandingRequest = null;
            }
        }
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public long getOffset() {
        long j;
        synchronized (this.$lock) {
            j = this.offset;
        }
        return j;
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public int read(ByteBuffer byteBuffer, long j) throws EndOfSegmentException, SegmentTruncatedException {
        synchronized (this.$lock) {
            Exceptions.checkNotClosed(this.asyncInput.isClosed(), this);
            if (this.offset >= this.endOffset) {
                log.debug("All events up to the configured end offset:{} have been read", Long.valueOf(this.endOffset));
                throw new EndOfSegmentException(EndOfSegmentException.ErrorType.END_OFFSET_REACHED);
            }
            if (this.outstandingRequest == null) {
                fillBuffer();
            }
            if (this.receivedTruncated) {
                throw new SegmentTruncatedException();
            }
            while (this.buffer.dataAvailable() == 0) {
                if (this.receivedEndOfSegment) {
                    throw new EndOfSegmentException();
                }
                Futures.await(this.outstandingRequest, j);
                if (!this.outstandingRequest.isDone()) {
                    return 0;
                }
                handleRequest();
            }
            int read = this.buffer.read(byteBuffer);
            this.offset += read;
            return read;
        }
    }

    private boolean dataWaitingToGoInBuffer() {
        return this.outstandingRequest != null && Futures.isSuccessful(this.outstandingRequest) && this.buffer.capacityAvailable() > 0;
    }

    private void handleRequest() throws SegmentTruncatedException {
        try {
            WireCommands.SegmentRead join = this.outstandingRequest.join();
            verifyIsAtCorrectOffset(join);
            if (join.getData().hasRemaining()) {
                this.buffer.fill(join.getData());
            }
            if (join.isEndOfSegment()) {
                this.receivedEndOfSegment = true;
            }
            if (join.getData().hasRemaining()) {
                return;
            }
            this.outstandingRequest = null;
            issueRequestIfNeeded();
        } catch (Exception e) {
            this.outstandingRequest = null;
            if (!(Exceptions.unwrap(e) instanceof SegmentTruncatedException)) {
                throw e;
            }
            this.receivedTruncated = true;
            throw new SegmentTruncatedException(e);
        }
    }

    private void verifyIsAtCorrectOffset(WireCommands.SegmentRead segmentRead) {
        long offset = segmentRead.getOffset() + segmentRead.getData().position();
        long dataAvailable = this.offset + this.buffer.dataAvailable();
        Preconditions.checkState(offset == dataAvailable, "ReadSegment returned data for the wrong offset %s vs %s", offset, dataAvailable);
    }

    private void issueRequestIfNeeded() {
        int computeReadLength = computeReadLength(this.offset + this.buffer.dataAvailable());
        if (this.receivedEndOfSegment || this.receivedTruncated || computeReadLength <= 0 || this.outstandingRequest != null) {
            return;
        }
        log.trace("Issuing read request for segment {} of {} bytes", getSegmentId(), Integer.valueOf(computeReadLength));
        this.outstandingRequest = this.asyncInput.read(this.offset + this.buffer.dataAvailable(), computeReadLength);
    }

    private int computeReadLength(long j) {
        Preconditions.checkState(this.endOffset >= j, "Current offset up to to which events are fetched should be less than the configured end offset");
        int max = Math.max(this.minReadLength, this.buffer.capacityAvailable());
        if (UNBOUNDED_END_OFFSET == this.endOffset) {
            return max;
        }
        return Math.toIntExact(Math.min(max, this.endOffset - j));
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream, java.lang.AutoCloseable
    public void close() {
        synchronized (this.$lock) {
            log.trace("Closing {}", this);
            if (this.outstandingRequest != null) {
                log.trace("Cancel outstanding read request for segment {}", this.asyncInput.getSegmentId());
                this.outstandingRequest.cancel(true);
            }
            this.asyncInput.close();
        }
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public CompletableFuture<?> fillBuffer() {
        CompletableFuture<?> completedFuture;
        synchronized (this.$lock) {
            log.trace("Filling buffer {}", this);
            Exceptions.checkNotClosed(this.asyncInput.isClosed(), this);
            try {
                issueRequestIfNeeded();
                while (dataWaitingToGoInBuffer()) {
                    handleRequest();
                }
                completedFuture = this.outstandingRequest == null ? CompletableFuture.completedFuture(null) : this.outstandingRequest;
            } catch (SegmentTruncatedException e) {
                log.warn("Encountered exception filling buffer", e);
                return CompletableFuture.completedFuture(null);
            }
        }
        return completedFuture;
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public int bytesInBuffer() {
        int i;
        synchronized (this.$lock) {
            int dataAvailable = this.buffer.dataAvailable();
            boolean z = this.receivedEndOfSegment || this.receivedTruncated || (this.outstandingRequest != null && this.outstandingRequest.isCompletedExceptionally());
            if (this.outstandingRequest != null && Futures.isSuccessful(this.outstandingRequest)) {
                WireCommands.SegmentRead join = this.outstandingRequest.join();
                dataAvailable += join.getData().remaining();
                z |= join.isEndOfSegment();
            }
            if (dataAvailable <= 0 && z) {
                dataAvailable = -1;
            }
            log.trace("bytesInBuffer {} on segment {} status is {}", new Object[]{Integer.valueOf(dataAvailable), getSegmentId(), this});
            i = dataAvailable;
        }
        return i;
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public Segment getSegmentId() {
        return this.asyncInput.getSegmentId();
    }

    @SuppressFBWarnings(justification = "generated code")
    public String toString() {
        return "SegmentInputStreamImpl(asyncInput=" + this.asyncInput + ", minReadLength=" + this.minReadLength + ", buffer=" + this.buffer + ", offset=" + getOffset() + ", endOffset=" + this.endOffset + ", receivedEndOfSegment=" + this.receivedEndOfSegment + ", receivedTruncated=" + this.receivedTruncated + ", outstandingRequest=" + this.outstandingRequest + ")";
    }
}
