/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.hamcrest.Description;
import org.hamcrest.MatcherAssert;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BucketsTest {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Test
    public void testSnapshotAndRestore() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnCheckpointRollingPolicy onCheckpointRollingPolicy = OnCheckpointRollingPolicy.build();
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, (RollingPolicy<String, String>)onCheckpointRollingPolicy, 0);
        TestUtils.MockListState<byte[]> bucketStateContainer = new TestUtils.MockListState<byte[]>();
        TestUtils.MockListState<Long> partCounterContainer = new TestUtils.MockListState<Long>();
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        buckets.snapshotState(0L, bucketStateContainer, partCounterContainer);
        MatcherAssert.assertThat(buckets.getActiveBuckets().get("test1"), BucketsTest.hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test1"));
        buckets.onElement((Object)"test2", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        buckets.snapshotState(1L, bucketStateContainer, partCounterContainer);
        MatcherAssert.assertThat(buckets.getActiveBuckets().get("test1"), BucketsTest.hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test1"));
        MatcherAssert.assertThat(buckets.getActiveBuckets().get("test2"), BucketsTest.hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test2"));
        Buckets<String, String> restoredBuckets = BucketsTest.restoreBuckets(path, (RollingPolicy<String, String>)onCheckpointRollingPolicy, 0, bucketStateContainer, partCounterContainer);
        Map activeBuckets = restoredBuckets.getActiveBuckets();
        Assert.assertTrue((boolean)activeBuckets.isEmpty());
    }

    private static TypeSafeMatcher<Bucket<String, String>> hasSinglePartFileToBeCommittedOnCheckpointAck(final Path testTmpPath, final String bucketId) {
        return new TypeSafeMatcher<Bucket<String, String>>(){

            protected boolean matchesSafely(Bucket<String, String> bucket) {
                return ((String)bucket.getBucketId()).equals(bucketId) && bucket.getBucketPath().equals((Object)new Path(testTmpPath, bucketId)) && bucket.getInProgressPart() == null && bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() && bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
            }

            public void describeTo(Description description) {
                description.appendText("a Bucket with a single pending part file @ ").appendValue((Object)new Path(testTmpPath, bucketId)).appendText("'");
            }
        };
    }

    @Test
    public void testMergeAtScaleInAndMaxCounterAtRecovery() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        DefaultRollingPolicy onCheckpointRP = DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(7L)).build();
        TestUtils.MockListState bucketStateContainerOne = new TestUtils.MockListState();
        TestUtils.MockListState bucketStateContainerTwo = new TestUtils.MockListState();
        TestUtils.MockListState partCounterContainerOne = new TestUtils.MockListState();
        TestUtils.MockListState partCounterContainerTwo = new TestUtils.MockListState();
        Buckets<String, String> bucketsOne = BucketsTest.createBuckets(path, (RollingPolicy<String, String>)onCheckpointRP, 0);
        Buckets<String, String> bucketsTwo = BucketsTest.createBuckets(path, (RollingPolicy<String, String>)onCheckpointRP, 1);
        bucketsOne.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        bucketsOne.snapshotState(0L, bucketStateContainerOne, partCounterContainerOne);
        Assert.assertEquals((long)1L, (long)bucketsOne.getMaxPartCounter());
        Assert.assertNotNull((Object)((Bucket)bucketsOne.getActiveBuckets().get("test1")).getInProgressPart());
        bucketsTwo.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        bucketsTwo.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        bucketsTwo.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        bucketsTwo.snapshotState(0L, bucketStateContainerTwo, partCounterContainerTwo);
        Assert.assertEquals((long)2L, (long)bucketsTwo.getMaxPartCounter());
        Assert.assertEquals((long)1L, (long)((Bucket)bucketsTwo.getActiveBuckets().get("test1")).getPendingFileRecoverablesPerCheckpoint().size());
        Assert.assertNotNull((Object)((Bucket)bucketsTwo.getActiveBuckets().get("test1")).getInProgressPart());
        TestUtils.MockListState<byte[]> mergedBucketStateContainer = new TestUtils.MockListState<byte[]>();
        TestUtils.MockListState<Long> mergedPartCounterContainer = new TestUtils.MockListState<Long>();
        mergedBucketStateContainer.addAll(bucketStateContainerOne.getBackingList());
        mergedBucketStateContainer.addAll(bucketStateContainerTwo.getBackingList());
        mergedPartCounterContainer.addAll(partCounterContainerOne.getBackingList());
        mergedPartCounterContainer.addAll(partCounterContainerTwo.getBackingList());
        Buckets<String, String> restoredBuckets = BucketsTest.restoreBuckets(path, (RollingPolicy<String, String>)onCheckpointRP, 0, mergedBucketStateContainer, mergedPartCounterContainer);
        Assert.assertEquals((long)2L, (long)restoredBuckets.getMaxPartCounter());
        Map activeBuckets = restoredBuckets.getActiveBuckets();
        Assert.assertEquals((long)1L, (long)activeBuckets.size());
        Assert.assertTrue((boolean)activeBuckets.keySet().contains("test1"));
        Bucket bucket = (Bucket)activeBuckets.get("test1");
        Assert.assertEquals((Object)"test1", (Object)bucket.getBucketId());
        Assert.assertEquals((Object)new Path(path, "test1"), (Object)bucket.getBucketPath());
        Assert.assertNotNull((Object)bucket.getInProgressPart());
        Assert.assertEquals((long)1L, (long)bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
        Assert.assertTrue((boolean)bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
    }

    @Test
    public void testOnProcessingTime() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy = new OnProcessingTimePolicy<String, String>(2L);
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);
        buckets.onElement((Object)"test", (SinkFunction.Context)new TestUtils.MockSinkContext(1L, 2L, 3L));
        buckets.onProcessingTime(7L);
        Assert.assertEquals((long)1L, (long)rollOnProcessingTimeCountingPolicy.getOnProcessingTimeRollCounter());
        Map activeBuckets = buckets.getActiveBuckets();
        Assert.assertEquals((long)1L, (long)activeBuckets.size());
        Assert.assertTrue((boolean)activeBuckets.keySet().contains("test"));
        Bucket bucket = (Bucket)activeBuckets.get("test");
        Assert.assertEquals((Object)"test", (Object)bucket.getBucketId());
        Assert.assertEquals((Object)new Path(path, "test"), (Object)bucket.getBucketPath());
        Assert.assertEquals((Object)"test", (Object)bucket.getBucketId());
        Assert.assertNull((Object)bucket.getInProgressPart());
        Assert.assertEquals((long)1L, (long)bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
        Assert.assertTrue((boolean)bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
    }

    @Test
    public void testBucketIsRemovedWhenNotActive() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy = new OnProcessingTimePolicy<String, String>(2L);
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);
        buckets.onElement((Object)"test", (SinkFunction.Context)new TestUtils.MockSinkContext(1L, 2L, 3L));
        buckets.onProcessingTime(7L);
        Assert.assertEquals((long)1L, (long)rollOnProcessingTimeCountingPolicy.getOnProcessingTimeRollCounter());
        buckets.snapshotState(0L, new TestUtils.MockListState(), new TestUtils.MockListState());
        buckets.commitUpToCheckpoint(0L);
        Assert.assertTrue((boolean)buckets.getActiveBuckets().isEmpty());
    }

    @Test
    public void testPartCounterAfterBucketResurrection() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy = new OnProcessingTimePolicy<String, String>(2L);
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);
        buckets.onElement((Object)"test", (SinkFunction.Context)new TestUtils.MockSinkContext(1L, 2L, 3L));
        Assert.assertEquals((long)1L, (long)((Bucket)buckets.getActiveBuckets().get("test")).getPartCounter());
        buckets.onProcessingTime(7L);
        Assert.assertEquals((long)1L, (long)rollOnProcessingTimeCountingPolicy.getOnProcessingTimeRollCounter());
        Assert.assertEquals((long)1L, (long)((Bucket)buckets.getActiveBuckets().get("test")).getPartCounter());
        buckets.snapshotState(0L, new TestUtils.MockListState(), new TestUtils.MockListState());
        buckets.commitUpToCheckpoint(0L);
        Assert.assertTrue((boolean)buckets.getActiveBuckets().isEmpty());
        buckets.onElement((Object)"test", (SinkFunction.Context)new TestUtils.MockSinkContext(2L, 3L, 4L));
        Assert.assertEquals((long)2L, (long)((Bucket)buckets.getActiveBuckets().get("test")).getPartCounter());
    }

    @Test
    public void testContextPassingNormalExecution() throws Exception {
        this.testCorrectTimestampPassingInContext(1L, 2L, 3L);
    }

    @Test
    public void testContextPassingNullTimestamp() throws Exception {
        this.testCorrectTimestampPassingInContext(null, 2L, 3L);
    }

    private void testCorrectTimestampPassingInContext(Long timestamp, long watermark, long processingTime) throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        Buckets buckets = new Buckets(path, (BucketAssigner)new VerifyingBucketAssigner(timestamp, watermark, processingTime), (BucketFactory)new DefaultBucketFactoryImpl(), (BucketWriter)new RowWiseBucketWriter(FileSystem.get((URI)path.toUri()).createRecoverableWriter(), (Encoder)new SimpleStringEncoder()), (RollingPolicy)DefaultRollingPolicy.builder().build(), 2, OutputFileConfig.builder().build());
        buckets.onElement((Object)"test", (SinkFunction.Context)new TestUtils.MockSinkContext(timestamp, watermark, processingTime));
    }

    @Test
    public void testBucketLifeCycleListenerOnCreatingAndInactive() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy = new OnProcessingTimePolicy<String, String>(2L);
        RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener();
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, rollOnProcessingTimeCountingPolicy, bucketLifeCycleListener, null, 0, OutputFileConfig.builder().build());
        TestUtils.MockListState bucketStateContainer = new TestUtils.MockListState();
        TestUtils.MockListState partCounterContainer = new TestUtils.MockListState();
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        buckets.onElement((Object)"test2", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 3L));
        buckets.onProcessingTime(4L);
        buckets.snapshotState(0L, bucketStateContainer, partCounterContainer);
        buckets.commitUpToCheckpoint(0L);
        buckets.onProcessingTime(6L);
        buckets.snapshotState(1L, bucketStateContainer, partCounterContainer);
        buckets.commitUpToCheckpoint(1L);
        List<Tuple2> expectedEvents = Arrays.asList(new Tuple2((Object)RecordBucketLifeCycleListener.EventType.CREATED, (Object)"test1"), new Tuple2((Object)RecordBucketLifeCycleListener.EventType.CREATED, (Object)"test2"), new Tuple2((Object)RecordBucketLifeCycleListener.EventType.INACTIVE, (Object)"test1"), new Tuple2((Object)RecordBucketLifeCycleListener.EventType.INACTIVE, (Object)"test2"));
        Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
    }

    @Test
    public void testBucketLifeCycleListenerOnRestoring() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy = new OnProcessingTimePolicy<String, String>(2L);
        RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener();
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, rollOnProcessingTimeCountingPolicy, bucketLifeCycleListener, null, 0, OutputFileConfig.builder().build());
        TestUtils.MockListState<byte[]> bucketStateContainer = new TestUtils.MockListState<byte[]>();
        TestUtils.MockListState<Long> partCounterContainer = new TestUtils.MockListState<Long>();
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        buckets.onElement((Object)"test2", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 3L));
        buckets.onProcessingTime(4L);
        buckets.snapshotState(0L, bucketStateContainer, partCounterContainer);
        buckets = BucketsTest.restoreBuckets(path, rollOnProcessingTimeCountingPolicy, bucketLifeCycleListener, null, 0, bucketStateContainer, partCounterContainer, OutputFileConfig.builder().build());
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("test2")), buckets.getActiveBuckets().keySet());
        List<Tuple2> expectedEvents = Arrays.asList(new Tuple2((Object)RecordBucketLifeCycleListener.EventType.CREATED, (Object)"test1"), new Tuple2((Object)RecordBucketLifeCycleListener.EventType.CREATED, (Object)"test2"), new Tuple2((Object)RecordBucketLifeCycleListener.EventType.INACTIVE, (Object)"test1"));
        Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
    }

    @Test
    public void testFileLifeCycleListener() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy = new OnProcessingTimePolicy<String, String>(2L);
        TestFileLifeCycleListener fileLifeCycleListener = new TestFileLifeCycleListener();
        Buckets<String, String> buckets = BucketsTest.createBuckets(path, rollOnProcessingTimeCountingPolicy, null, fileLifeCycleListener, 0, OutputFileConfig.builder().build());
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 2L));
        buckets.onElement((Object)"test2", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 3L));
        buckets.onProcessingTime(4L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 5L));
        buckets.onElement((Object)"test2", (SinkFunction.Context)new TestUtils.MockSinkContext(null, 1L, 6L));
        Assert.assertEquals((long)2L, (long)fileLifeCycleListener.files.size());
        Assert.assertEquals(Arrays.asList("part-0-0", "part-0-1"), fileLifeCycleListener.files.get("test1"));
        Assert.assertEquals(Collections.singletonList("part-0-1"), fileLifeCycleListener.files.get("test2"));
    }

    private static Buckets<String, String> createBuckets(Path basePath, RollingPolicy<String, String> rollingPolicy, int subtaskIdx) throws IOException {
        return BucketsTest.createBuckets(basePath, rollingPolicy, null, null, subtaskIdx, OutputFileConfig.builder().build());
    }

    private static Buckets<String, String> createBuckets(Path basePath, RollingPolicy<String, String> rollingPolicy, BucketLifeCycleListener<String, String> bucketLifeCycleListener, FileLifeCycleListener<String> fileLifeCycleListener, int subtaskIdx, OutputFileConfig outputFileConfig) throws IOException {
        Buckets buckets = new Buckets(basePath, (BucketAssigner)new TestUtils.StringIdentityBucketAssigner(), (BucketFactory)new DefaultBucketFactoryImpl(), (BucketWriter)new RowWiseBucketWriter(FileSystem.get((URI)basePath.toUri()).createRecoverableWriter(), (Encoder)new SimpleStringEncoder()), rollingPolicy, subtaskIdx, outputFileConfig);
        if (bucketLifeCycleListener != null) {
            buckets.setBucketLifeCycleListener(bucketLifeCycleListener);
        }
        if (fileLifeCycleListener != null) {
            buckets.setFileLifeCycleListener(fileLifeCycleListener);
        }
        return buckets;
    }

    private static Buckets<String, String> restoreBuckets(Path basePath, RollingPolicy<String, String> rollingPolicy, int subtaskIdx, ListState<byte[]> bucketState, ListState<Long> partCounterState) throws Exception {
        return BucketsTest.restoreBuckets(basePath, rollingPolicy, null, null, subtaskIdx, bucketState, partCounterState, OutputFileConfig.builder().build());
    }

    private static Buckets<String, String> restoreBuckets(Path basePath, RollingPolicy<String, String> rollingPolicy, BucketLifeCycleListener<String, String> bucketLifeCycleListener, FileLifeCycleListener<String> fileLifeCycleListener, int subtaskIdx, ListState<byte[]> bucketState, ListState<Long> partCounterState, OutputFileConfig outputFileConfig) throws Exception {
        Buckets<String, String> restoredBuckets = BucketsTest.createBuckets(basePath, rollingPolicy, bucketLifeCycleListener, fileLifeCycleListener, subtaskIdx, outputFileConfig);
        restoredBuckets.initializeState(bucketState, partCounterState);
        return restoredBuckets;
    }

    private static class TestFileLifeCycleListener
    implements FileLifeCycleListener<String> {
        private final Map<String, List<String>> files = new HashMap<String, List<String>>();

        private TestFileLifeCycleListener() {
        }

        public void onPartFileOpened(String bucket, Path newPath) {
            this.files.computeIfAbsent(bucket, k -> new ArrayList()).add(newPath.getName());
        }
    }

    private static class RecordBucketLifeCycleListener
    implements BucketLifeCycleListener<String, String> {
        private List<Tuple2<EventType, String>> events = new ArrayList<Tuple2<EventType, String>>();

        private RecordBucketLifeCycleListener() {
        }

        public void bucketCreated(Bucket<String, String> bucket) {
            this.events.add((Tuple2<EventType, String>)new Tuple2((Object)EventType.CREATED, bucket.getBucketId()));
        }

        public void bucketInactive(Bucket<String, String> bucket) {
            this.events.add((Tuple2<EventType, String>)new Tuple2((Object)EventType.INACTIVE, bucket.getBucketId()));
        }

        public List<Tuple2<EventType, String>> getEvents() {
            return this.events;
        }

        public static enum EventType {
            CREATED,
            INACTIVE;

        }
    }

    private static class VerifyingBucketAssigner
    implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 7729086510972377578L;
        private final Long expectedTimestamp;
        private final long expectedWatermark;
        private final long expectedProcessingTime;

        VerifyingBucketAssigner(Long expectedTimestamp, long expectedWatermark, long expectedProcessingTime) {
            this.expectedTimestamp = expectedTimestamp;
            this.expectedWatermark = expectedWatermark;
            this.expectedProcessingTime = expectedProcessingTime;
        }

        public String getBucketId(String element, BucketAssigner.Context context) {
            Long elementTimestamp = context.timestamp();
            long watermark = context.currentWatermark();
            long processingTime = context.currentProcessingTime();
            Assert.assertEquals((Object)this.expectedTimestamp, (Object)elementTimestamp);
            Assert.assertEquals((long)this.expectedProcessingTime, (long)processingTime);
            Assert.assertEquals((long)this.expectedWatermark, (long)watermark);
            return element;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    private static class OnProcessingTimePolicy<IN, BucketID>
    implements RollingPolicy<IN, BucketID> {
        private static final long serialVersionUID = 1L;
        private int onProcessingTimeRollCounter = 0;
        private final long rolloverInterval;

        OnProcessingTimePolicy(long rolloverInterval) {
            this.rolloverInterval = rolloverInterval;
        }

        public int getOnProcessingTimeRollCounter() {
            return this.onProcessingTimeRollCounter;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
            return false;
        }

        public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
            return false;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
            boolean result;
            boolean bl = result = currentTime - partFileState.getCreationTime() >= this.rolloverInterval;
            if (result) {
                ++this.onProcessingTimeRollCounter;
            }
            return result;
        }
    }
}

