package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.class */
public class BatchLoads<DestinationT> extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
    static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);

    @VisibleForTesting
    static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;

    @VisibleForTesting
    static final int MAX_NUM_FILES = 10000;

    @VisibleForTesting
    static final long MAX_SIZE_BYTES = 12094627905536L;
    static final long DEFAULT_MAX_FILE_SIZE = 4398046511104L;
    static final int DEFAULT_NUM_FILE_SHARDS = 0;
    static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
    static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
    static final int MAX_RETRY_JOBS = 3;
    private final BigQueryIO.Write.WriteDisposition writeDisposition;
    private final BigQueryIO.Write.CreateDisposition createDisposition;
    private final boolean singletonTable;
    private final DynamicDestinations<?, DestinationT> dynamicDestinations;
    private final Coder<DestinationT> destinationCoder;
    private ValueProvider<String> customGcsTempLocation;
    private ValueProvider<String> loadJobProjectId;
    private BigQueryServices bigQueryServices = new BigQueryServicesImpl();
    private int maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
    private long maxFileSize = DEFAULT_MAX_FILE_SIZE;
    private int numFileShards = DEFAULT_NUM_FILE_SHARDS;
    private Duration triggeringFrequency = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchLoads(BigQueryIO.Write.WriteDisposition writeDisposition, BigQueryIO.Write.CreateDisposition createDisposition, boolean z, DynamicDestinations<?, DestinationT> dynamicDestinations, Coder<DestinationT> coder, ValueProvider<String> valueProvider, @Nullable ValueProvider<String> valueProvider2) {
        this.writeDisposition = writeDisposition;
        this.createDisposition = createDisposition;
        this.singletonTable = z;
        this.dynamicDestinations = dynamicDestinations;
        this.destinationCoder = coder;
        this.customGcsTempLocation = valueProvider;
        this.loadJobProjectId = valueProvider2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTestServices(BigQueryServices bigQueryServices) {
        this.bigQueryServices = bigQueryServices;
    }

    public int getMaxNumWritersPerBundle() {
        return this.maxNumWritersPerBundle;
    }

    public void setMaxNumWritersPerBundle(int i) {
        this.maxNumWritersPerBundle = i;
    }

    public void setTriggeringFrequency(Duration duration) {
        this.triggeringFrequency = duration;
    }

    public void setNumFileShards(int i) {
        this.numFileShards = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setMaxFileSize(long j) {
        this.maxFileSize = j;
    }

    public void validate(PipelineOptions pipelineOptions) {
        String str;
        if (this.customGcsTempLocation == null) {
            str = pipelineOptions.getTempLocation();
        } else if (!this.customGcsTempLocation.isAccessible()) {
            return;
        } else {
            str = (String) this.customGcsTempLocation.get();
        }
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "BigQueryIO.Write needs a GCS temp location to store temp files.");
        if (this.bigQueryServices == null) {
            try {
                GcsPath.fromUri(str);
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException(String.format("BigQuery temp location expected a valid 'gs://' path, but was given '%s'", str), e);
            }
        }
    }

    private WriteResult expandTriggered(PCollection<KV<DestinationT, TableRow>> pCollection) {
        Preconditions.checkArgument(this.numFileShards > 0);
        Pipeline pipeline = pCollection.getPipeline();
        PCollectionView<String> createLoadJobIdPrefixView = createLoadJobIdPrefixView(pipeline);
        PCollectionView<String> createTempFilePrefixView = createTempFilePrefixView(pipeline, createLoadJobIdPrefixView);
        PCollection apply = writeShardedFiles((PCollection) pCollection.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterFirst.of(new Trigger.OnceTrigger[]{AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.triggeringFrequency), AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT)}))).discardingFiredPanes()), createTempFilePrefixView).apply("applyUserTrigger", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.triggeringFrequency))).discardingFiredPanes());
        TupleTag tupleTag = new TupleTag("multiPartitionsTag");
        TupleTag tupleTag2 = new TupleTag("singlePartitionTag");
        PCollectionTuple apply2 = apply.apply("AttachSingletonKey", WithKeys.of((Void) null)).setCoder(KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(this.destinationCoder))).apply("GroupOntoSingleton", GroupByKey.create()).apply("ExtractResultValues", Values.create()).apply("WritePartitionTriggered", ParDo.of(new WritePartition(this.singletonTable, this.dynamicDestinations, createTempFilePrefixView, tupleTag, tupleTag2)).withSideInputs(new PCollectionView[]{createTempFilePrefixView}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        writeTempTables(apply2.get(tupleTag), createLoadJobIdPrefixView).apply(Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))).apply(WithKeys.of((Void) null)).setCoder(KvCoder.of(VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of()))).apply(GroupByKey.create()).apply(Values.create()).apply("WriteRenameTriggered", ParDo.of(new WriteRename(this.bigQueryServices, createLoadJobIdPrefixView, this.writeDisposition, this.createDisposition)).withSideInputs(new PCollectionView[]{createLoadJobIdPrefixView}));
        writeSinglePartition(apply2.get(tupleTag2), createLoadJobIdPrefixView);
        return writeResult(pipeline);
    }

    public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> pCollection) {
        Pipeline pipeline = pCollection.getPipeline();
        PCollectionView<String> createLoadJobIdPrefixView = createLoadJobIdPrefixView(pipeline);
        PCollectionView<String> createTempFilePrefixView = createTempFilePrefixView(pipeline, createLoadJobIdPrefixView);
        PCollection<KV<DestinationT, TableRow>> pCollection2 = (PCollection) pCollection.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes());
        PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles = this.numFileShards == 0 ? writeDynamicallyShardedFiles(pCollection2, createTempFilePrefixView) : writeShardedFiles(pCollection2, createTempFilePrefixView);
        TupleTag<KV<ShardedKey<DestinationT>, List<String>>> tupleTag = new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.1
        };
        TupleTag<KV<ShardedKey<DestinationT>, List<String>>> tupleTag2 = new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.2
        };
        PCollectionTuple apply = writeDynamicallyShardedFiles.apply("ReifyResults", new ReifyAsIterable()).setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(this.destinationCoder))).apply("WritePartitionUntriggered", ParDo.of(new WritePartition(this.singletonTable, this.dynamicDestinations, createTempFilePrefixView, tupleTag, tupleTag2)).withSideInputs(new PCollectionView[]{createTempFilePrefixView}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        writeTempTables(apply.get(tupleTag), createLoadJobIdPrefixView).apply("ReifyRenameInput", new ReifyAsIterable()).setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of()))).apply("WriteRenameUntriggered", ParDo.of(new WriteRename(this.bigQueryServices, createLoadJobIdPrefixView, this.writeDisposition, this.createDisposition)).withSideInputs(new PCollectionView[]{createLoadJobIdPrefixView}));
        writeSinglePartition(apply.get(tupleTag2), createLoadJobIdPrefixView);
        return writeResult(pipeline);
    }

    private PCollectionView<String> createLoadJobIdPrefixView(Pipeline pipeline) {
        return pipeline.apply("JobIdCreationRoot", Create.of((Void) null, new Void[DEFAULT_NUM_FILE_SHARDS])).apply("CreateJobId", ParDo.of(new DoFn<Void, String>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.3
            @DoFn.ProcessElement
            public void process(DoFn<Void, String>.ProcessContext processContext) {
                processContext.output(String.format("beam_load_%s_%s", processContext.getPipelineOptions().getJobName().replaceAll("-", ""), BigQueryHelpers.randomUUIDString()));
            }
        })).apply(View.asSingleton());
    }

    private PCollectionView<String> createTempFilePrefixView(Pipeline pipeline, final PCollectionView<String> pCollectionView) {
        return pipeline.apply(Create.of("", new String[DEFAULT_NUM_FILE_SHARDS])).apply("GetTempFilePrefix", ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.4
            @DoFn.ProcessElement
            public void getTempFilePrefix(DoFn<String, String>.ProcessContext processContext) {
                String resolveTempLocation = BigQueryHelpers.resolveTempLocation(BatchLoads.this.customGcsTempLocation != null ? (String) BatchLoads.this.customGcsTempLocation.get() : processContext.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", (String) processContext.sideInput(pCollectionView));
                BatchLoads.LOG.info("Writing BigQuery temporary files to {} before loading them.", resolveTempLocation);
                processContext.output(resolveTempLocation);
            }
        }).withSideInputs(new PCollectionView[]{pCollectionView})).apply("TempFilePrefixView", View.asSingleton());
    }

    PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(PCollection<KV<DestinationT, TableRow>> pCollection, PCollectionView<String> pCollectionView) {
        TupleTag<WriteBundlesToFiles.Result<DestinationT>> tupleTag = new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.5
        };
        TupleTag<KV<ShardedKey<DestinationT>, TableRow>> tupleTag2 = new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.6
        };
        PCollectionTuple apply = pCollection.apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(pCollectionView, tupleTag2, this.maxNumWritersPerBundle, this.maxFileSize)).withSideInputs(new PCollectionView[]{pCollectionView}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        return PCollectionList.of(apply.get(tupleTag).setCoder(WriteBundlesToFiles.ResultCoder.of(this.destinationCoder))).and(writeShardedRecords(apply.get(tupleTag2).setCoder(KvCoder.of(ShardedKeyCoder.of(this.destinationCoder), TableRowJsonCoder.of())), pCollectionView)).apply("FlattenFiles", Flatten.pCollections()).setCoder(WriteBundlesToFiles.ResultCoder.of(this.destinationCoder));
    }

    PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(PCollection<KV<DestinationT, TableRow>> pCollection, PCollectionView<String> pCollectionView) {
        Preconditions.checkState(this.numFileShards > 0);
        return writeShardedRecords(pCollection.apply("AddShard", ParDo.of(new DoFn<KV<DestinationT, TableRow>, KV<ShardedKey<DestinationT>, TableRow>>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.7
            int shardNumber;

            @DoFn.Setup
            public void setup() {
                this.shardNumber = ThreadLocalRandom.current().nextInt(BatchLoads.this.numFileShards);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<DestinationT, TableRow>, KV<ShardedKey<DestinationT>, TableRow>>.ProcessContext processContext) {
                Object key = ((KV) processContext.element()).getKey();
                TableRow tableRow = (TableRow) ((KV) processContext.element()).getValue();
                int i = this.shardNumber + 1;
                this.shardNumber = i;
                processContext.output(KV.of(ShardedKey.of(key, i % BatchLoads.this.numFileShards), tableRow));
            }
        })).setCoder(KvCoder.of(ShardedKeyCoder.of(this.destinationCoder), TableRowJsonCoder.of())), pCollectionView);
    }

    private PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedRecords(PCollection<KV<ShardedKey<DestinationT>, TableRow>> pCollection, PCollectionView<String> pCollectionView) {
        return pCollection.apply("GroupByDestination", GroupByKey.create()).apply("WriteGroupedRecords", ParDo.of(new WriteGroupedRecordsToFiles(pCollectionView, this.maxFileSize)).withSideInputs(new PCollectionView[]{pCollectionView})).setCoder(WriteBundlesToFiles.ResultCoder.of(this.destinationCoder));
    }

    private PCollection<KV<TableDestination, String>> writeTempTables(PCollection<KV<ShardedKey<DestinationT>, List<String>>> pCollection, PCollectionView<String> pCollectionView) {
        ArrayList newArrayList = Lists.newArrayList(new PCollectionView[]{pCollectionView});
        newArrayList.addAll(this.dynamicDestinations.getSideInputs());
        return pCollection.setCoder(KvCoder.of(ShardedKeyCoder.of(NullableCoder.of(this.destinationCoder)), ListCoder.of(StringUtf8Coder.of()))).apply("MultiPartitionsReshuffle", Reshuffle.of()).apply("MultiPartitionsWriteTables", new WriteTables(false, this.bigQueryServices, pCollectionView, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED, newArrayList, this.dynamicDestinations, this.loadJobProjectId));
    }

    void writeSinglePartition(PCollection<KV<ShardedKey<DestinationT>, List<String>>> pCollection, PCollectionView<String> pCollectionView) {
        ArrayList newArrayList = Lists.newArrayList(new PCollectionView[]{pCollectionView});
        newArrayList.addAll(this.dynamicDestinations.getSideInputs());
        pCollection.setCoder(KvCoder.of(ShardedKeyCoder.of(NullableCoder.of(this.destinationCoder)), ListCoder.of(StringUtf8Coder.of()))).apply("SinglePartitionsReshuffle", Reshuffle.of()).apply("SinglePartitionWriteTables", new WriteTables(true, this.bigQueryServices, pCollectionView, this.writeDisposition, this.createDisposition, newArrayList, this.dynamicDestinations, this.loadJobProjectId));
    }

    private WriteResult writeResult(Pipeline pipeline) {
        return WriteResult.in(pipeline, new TupleTag("failedInserts"), pipeline.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class))));
    }

    public WriteResult expand(PCollection<KV<DestinationT, TableRow>> pCollection) {
        return this.triggeringFrequency != null ? expandTriggered(pCollection) : expandUntriggered(pCollection);
    }
}
