package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamContinuationTokenHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.class */
public class ReadChangeStreamPartitionAction {
    private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionAction.class);
    private final MetadataTableDao metadataTableDao;
    private final ChangeStreamDao changeStreamDao;
    private final ChangeStreamMetrics metrics;
    private final ChangeStreamAction changeStreamAction;
    private final Duration heartbeatDuration;
    private final SizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator;

    public ReadChangeStreamPartitionAction(MetadataTableDao metadataTableDao, ChangeStreamDao changeStreamDao, ChangeStreamMetrics changeStreamMetrics, ChangeStreamAction changeStreamAction, Duration duration, SizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator) {
        this.metadataTableDao = metadataTableDao;
        this.changeStreamDao = changeStreamDao;
        this.metrics = changeStreamMetrics;
        this.changeStreamAction = changeStreamAction;
        this.heartbeatDuration = duration;
        this.sizeEstimator = sizeEstimator;
    }

    /* JADX WARN: Finally extract failed */
    public DoFn.ProcessContinuation run(PartitionRecord partitionRecord, RestrictionTracker<StreamProgress, StreamProgress> restrictionTracker, DoFn.OutputReceiver<KV<ByteString, ChangeStreamRecord>> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) throws IOException {
        BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> bytesThroughputEstimator = new BytesThroughputEstimator<>(this.sizeEstimator, Instant.now());
        if (((StreamProgress) restrictionTracker.currentRestriction()).isEmpty()) {
            boolean lockAndRecordPartition = this.metadataTableDao.lockAndRecordPartition(partitionRecord);
            Iterator<NewPartition> it = partitionRecord.getParentPartitions().iterator();
            while (it.hasNext()) {
                this.metadataTableDao.deleteNewPartition(it.next());
            }
            if (!lockAndRecordPartition) {
                LOG.info("RCSP  {} : Could not acquire lock with uid: {}, because this is a duplicate and another worker is working  on this partition already.", ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), partitionRecord.getUuid());
                StreamProgress streamProgress = new StreamProgress();
                streamProgress.setFailToLock(true);
                this.metrics.decPartitionStreamCount();
                restrictionTracker.tryClaim(streamProgress);
                return DoFn.ProcessContinuation.stop();
            }
        } else if (((StreamProgress) restrictionTracker.currentRestriction()).getCloseStream() == null && !this.metadataTableDao.doHoldLock(partitionRecord.getPartition(), partitionRecord.getUuid())) {
            LOG.warn("RCSP  {} : Subsequent run that doesn't hold the lock {}. This is not unexpected and should probably be reviewed.", ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), partitionRecord.getUuid());
            StreamProgress streamProgress2 = new StreamProgress();
            streamProgress2.setFailToLock(true);
            this.metrics.decPartitionStreamCount();
            restrictionTracker.tryClaim(streamProgress2);
            return DoFn.ProcessContinuation.stop();
        }
        CloseStream closeStream = ((StreamProgress) restrictionTracker.currentRestriction()).getCloseStream();
        if (closeStream == null) {
            this.metadataTableDao.updateWatermark(partitionRecord.getPartition(), (Instant) manualWatermarkEstimator.getState(), ((StreamProgress) restrictionTracker.currentRestriction()).getCurrentToken());
            ServerStream<ChangeStreamRecord> serverStream = null;
            try {
                try {
                    serverStream = this.changeStreamDao.readChangeStreamPartition(partitionRecord, (StreamProgress) restrictionTracker.currentRestriction(), partitionRecord.getEndTime(), this.heartbeatDuration);
                    Iterator it2 = serverStream.iterator();
                    while (it2.hasNext()) {
                        Optional<DoFn.ProcessContinuation> run = this.changeStreamAction.run(partitionRecord, (ChangeStreamRecord) it2.next(), restrictionTracker, outputReceiver, manualWatermarkEstimator, bytesThroughputEstimator);
                        if (run.isPresent()) {
                            DoFn.ProcessContinuation processContinuation = run.get();
                            if (serverStream != null) {
                                serverStream.cancel();
                            }
                            return processContinuation;
                        }
                    }
                    if (serverStream != null) {
                        serverStream.cancel();
                    }
                    return DoFn.ProcessContinuation.resume();
                } catch (Exception e) {
                    throw e;
                }
            } catch (Throwable th) {
                if (serverStream != null) {
                    serverStream.cancel();
                }
                throw th;
            }
        }
        LOG.debug("RCSP: Processing CloseStream");
        this.metrics.decPartitionStreamCount();
        if (closeStream.getStatus().getCode() == Status.Code.OK) {
            Instant ofEpochMilli = Instant.ofEpochMilli(Long.MAX_VALUE);
            Instant endTime = partitionRecord.getEndTime();
            if (endTime != null) {
                ofEpochMilli = endTime;
            }
            manualWatermarkEstimator.setWatermark(ofEpochMilli);
            this.metadataTableDao.updateWatermark(partitionRecord.getPartition(), manualWatermarkEstimator.currentWatermark(), null);
            LOG.info("RCSP {}: Reached end time, terminating...", ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()));
            return DoFn.ProcessContinuation.stop();
        }
        if (closeStream.getStatus().getCode() != Status.Code.OUT_OF_RANGE) {
            LOG.error("RCSP {}: Reached unexpected terminal state: {}", ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), closeStream.getStatus());
            return DoFn.ProcessContinuation.stop();
        }
        this.metadataTableDao.releaseStreamPartitionLockForDeletion(partitionRecord.getPartition(), partitionRecord.getUuid());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = closeStream.getNewPartitions().size() == closeStream.getChangeStreamContinuationTokens().size();
        for (int i = 0; i < closeStream.getChangeStreamContinuationTokens().size(); i++) {
            Range.ByteStringRange partition = z ? (Range.ByteStringRange) closeStream.getNewPartitions().get(i) : ((ChangeStreamContinuationToken) closeStream.getChangeStreamContinuationTokens().get(i)).getPartition();
            arrayList.add(partition);
            ChangeStreamContinuationToken tokenWithCorrectPartition = ChangeStreamContinuationTokenHelper.getTokenWithCorrectPartition(partitionRecord.getPartition(), (ChangeStreamContinuationToken) closeStream.getChangeStreamContinuationTokens().get(i));
            arrayList2.add(tokenWithCorrectPartition.getPartition());
            this.metadataTableDao.writeNewPartition(new NewPartition(partition, Collections.singletonList(tokenWithCorrectPartition), (Instant) manualWatermarkEstimator.getState()));
        }
        LOG.info("RCSP {}: Split/Merge into {}", ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), ByteStringRangeHelper.partitionsToString(arrayList));
        if (!ByteStringRangeHelper.coverSameKeySpace(arrayList2, partitionRecord.getPartition())) {
            LOG.warn("RCSP {}: CloseStream has tokens {} that don't cover the entire keyspace", ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), ByteStringRangeHelper.partitionsToString(arrayList2));
        }
        this.metadataTableDao.deleteStreamPartitionRow(partitionRecord.getPartition());
        return DoFn.ProcessContinuation.stop();
    }
}
