/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.flume.sink;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.apache.pulsar.io.flume.sink.StringSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceOfFlume
extends AbstractPollableSource
implements BatchSizeSupported {
    private static final Logger log = LoggerFactory.getLogger(SourceOfFlume.class);
    public static final String BATCH_DURATION_MS = "batchDurationMillis";
    private long batchSize;
    private int maxBatchDurationMillis;
    private SourceCounter counter;
    private final List<Event> eventList = new ArrayList<Event>();

    public synchronized void doStart() {
        log.info("start source of flume ...");
        this.counter = new SourceCounter("flume-source");
        this.counter.start();
    }

    public void doStop() {
        log.info("stop source of flume ...");
        this.counter.stop();
    }

    public void doConfigure(Context context) {
        this.batchSize = context.getInteger("batchSize", Integer.valueOf(1000)).intValue();
        this.maxBatchDurationMillis = context.getInteger(BATCH_DURATION_MS, Integer.valueOf(1000));
        log.info("context: {}", (Object)context);
    }

    public PollableSource.Status doProcess() {
        try {
            long maxBatchEndTime = System.currentTimeMillis() + (long)this.maxBatchDurationMillis;
            while ((long)this.eventList.size() < this.getBatchSize() && System.currentTimeMillis() < maxBatchEndTime) {
                BlockingQueue<Object> blockingQueue = StringSink.getQueue();
                while (blockingQueue != null && !blockingQueue.isEmpty()) {
                    Object message = blockingQueue.take();
                    String eventBody = message.toString();
                    Event event = EventBuilder.withBody((byte[])eventBody.getBytes());
                    this.eventList.add(event);
                }
            }
            if (this.eventList.size() > 0) {
                this.counter.addToEventReceivedCount((long)this.eventList.size());
                this.getChannelProcessor().processEventBatch(this.eventList);
                this.eventList.clear();
                return PollableSource.Status.READY;
            }
            return PollableSource.Status.BACKOFF;
        }
        catch (Exception e) {
            log.error("Flume Source EXCEPTION", (Throwable)e);
            this.counter.incrementEventReadOrChannelFail((Throwable)e);
            return PollableSource.Status.BACKOFF;
        }
    }

    public long getBatchSize() {
        return this.batchSize;
    }
}

