package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.common.collect.ImmutableSet;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.class */
public class TestPubsubSignal implements TestRule {
    private static final String TOPIC_FORMAT = "projects/%s/topics/%s-result1";
    private static final String SUBSCRIPTION_FORMAT = "projects/%s/subscriptions/%s";
    private static final String NO_ID_ATTRIBUTE = null;
    private static final String NO_TIMESTAMP_ATTRIBUTE = null;
    PubsubClient pubsub;
    private TestPubsubOptions pipelineOptions;
    private String resultTopicPath;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal$PublishSuccessWhen.class */
    static class PublishSuccessWhen<T> extends PTransform<PCollection<? extends T>, POutput> {
        private Coder<T> coder;
        private SerializableFunction<Set<T>, Boolean> successPredicate;
        private String resultTopicPath;

        PublishSuccessWhen(Coder<T> coder, SerializableFunction<Set<T>, Boolean> serializableFunction, String str) {
            this.coder = coder;
            this.successPredicate = serializableFunction;
            this.resultTopicPath = str;
        }

        public POutput expand(PCollection<? extends T> pCollection) {
            return pCollection.apply(Window.into(new GlobalWindows())).apply(WithKeys.of("dummyKey")).apply("checkAllEventsForSuccess", ParDo.of(new StatefulPredicateCheck(this.coder, this.successPredicate))).apply("publishSuccess", PubsubIO.writeStrings().to(this.resultTopicPath));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal$StatefulPredicateCheck.class */
    public static class StatefulPredicateCheck<T> extends DoFn<KV<String, ? extends T>, String> {
        private SerializableFunction<Set<T>, Boolean> successPredicate;

        @DoFn.StateId("seenEvents")
        private final StateSpec<SetState<T>> seenEvents;

        StatefulPredicateCheck(Coder<T> coder, SerializableFunction<Set<T>, Boolean> serializableFunction) {
            this.seenEvents = StateSpecs.set(coder);
            this.successPredicate = serializableFunction;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, ? extends T>, String>.ProcessContext processContext, @DoFn.StateId("seenEvents") SetState<T> setState) {
            setState.add(((KV) processContext.element()).getValue());
            try {
                if (((Boolean) this.successPredicate.apply(ImmutableSet.copyOf((Iterable) setState.read()))).booleanValue()) {
                    processContext.output("SUCCESS");
                }
            } catch (Throwable th) {
                processContext.output("FAILURE: " + th.getMessage());
            }
        }
    }

    public static TestPubsubSignal create() {
        return new TestPubsubSignal((TestPubsubOptions) TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class));
    }

    private TestPubsubSignal(TestPubsubOptions testPubsubOptions) {
        this.pipelineOptions = testPubsubOptions;
    }

    public Statement apply(final Statement statement, final Description description) {
        return new Statement() { // from class: org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal.1
            public void evaluate() throws Throwable {
                if (TestPubsubSignal.this.pubsub != null) {
                    throw new AssertionError("Pubsub client was not shutdown in previous test. Topic path is'" + TestPubsubSignal.this.resultTopicPath + "'. Current test: " + description.getDisplayName());
                }
                try {
                    TestPubsubSignal.this.initializePubsub(description);
                    statement.evaluate();
                } finally {
                    TestPubsubSignal.this.tearDown();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializePubsub(Description description) throws IOException {
        this.pubsub = PubsubGrpcClient.FACTORY.newClient(NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, this.pipelineOptions);
        String format = String.format(TOPIC_FORMAT, this.pipelineOptions.getProject(), TestPubsub.createTopicName(description));
        this.pubsub.createTopic(new PubsubClient.TopicPath(format));
        this.resultTopicPath = format;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tearDown() throws IOException {
        if (this.pubsub == null) {
            return;
        }
        try {
            if (this.resultTopicPath != null) {
                this.pubsub.deleteTopic(new PubsubClient.TopicPath(this.resultTopicPath));
            }
        } finally {
            this.pubsub.close();
            this.pubsub = null;
            this.resultTopicPath = null;
        }
    }

    public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(Coder<T> coder, SerializableFunction<Set<T>, Boolean> serializableFunction) {
        return new PublishSuccessWhen(coder, serializableFunction, this.resultTopicPath);
    }

    public void waitForSuccess(Duration duration) throws IOException {
        PubsubClient.SubscriptionPath subscriptionPath = new PubsubClient.SubscriptionPath(String.format(SUBSCRIPTION_FORMAT, this.pipelineOptions.getProject(), "subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong())));
        this.pubsub.createSubscription(new PubsubClient.TopicPath(this.resultTopicPath), subscriptionPath, (int) duration.getStandardSeconds());
        String pollForResultForDuration = pollForResultForDuration(subscriptionPath, duration);
        if (!"SUCCESS".equals(pollForResultForDuration)) {
            throw new AssertionError(pollForResultForDuration);
        }
    }

    private String pollForResultForDuration(PubsubClient.SubscriptionPath subscriptionPath, Duration duration) throws IOException {
        List<PubsubClient.IncomingMessage> list = null;
        DateTime plus = DateTime.now().plus(duration.getMillis());
        do {
            try {
                list = this.pubsub.pull(DateTime.now().getMillis(), subscriptionPath, 1, false);
                this.pubsub.acknowledge(subscriptionPath, (List) list.stream().map(incomingMessage -> {
                    return incomingMessage.ackId;
                }).collect(Collectors.toList()));
                break;
            } catch (StatusRuntimeException e) {
                System.out.println("Error while polling for result: " + e.getStatus());
                sleep(500L);
            }
        } while (DateTime.now().isBefore(plus));
        if (list == null) {
            throw new AssertionError("Did not receive success in " + duration.getStandardSeconds() + "s");
        }
        return new String(list.get(0).elementBytes, StandardCharsets.UTF_8);
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
