/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.Logger;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.metrics.KafkaMetricsGroup;
import kafka.tools.MirrorMaker;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.ControlThrowable;

public final class MirrorMaker$
implements KafkaMetricsGroup {
    public static MirrorMaker$ MODULE$;
    private MirrorMaker.MirrorMakerProducer producer;
    private Seq<MirrorMaker.MirrorMakerThread> kafka$tools$MirrorMaker$$mirrorMakerThreads;
    private final AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown;
    private final AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages;
    private MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler;
    private int kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    private boolean kafka$tools$MirrorMaker$$abortOnSendFailure;
    private volatile boolean kafka$tools$MirrorMaker$$exitingOnSendFailure;
    private long lastSuccessfulCommitTime;
    private final Time time;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    static {
        new MirrorMaker$();
    }

    @Override
    public MetricName metricName(String name, Map<String, String> tags) {
        return KafkaMetricsGroup.metricName$(this, name, tags);
    }

    @Override
    public MetricName explicitMetricName(String group, String typeName, String name, Map<String, String> tags) {
        return KafkaMetricsGroup.explicitMetricName$(this, group, typeName, name, tags);
    }

    @Override
    public <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
        return KafkaMetricsGroup.newGauge$(this, name, metric, tags);
    }

    @Override
    public <T> Map<String, String> newGauge$default$3() {
        return KafkaMetricsGroup.newGauge$default$3$(this);
    }

    @Override
    public Meter newMeter(String name, String eventType, TimeUnit timeUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newMeter$(this, name, eventType, timeUnit, tags);
    }

    @Override
    public Map<String, String> newMeter$default$4() {
        return KafkaMetricsGroup.newMeter$default$4$(this);
    }

    @Override
    public Histogram newHistogram(String name, boolean biased, Map<String, String> tags) {
        return KafkaMetricsGroup.newHistogram$(this, name, biased, tags);
    }

    @Override
    public boolean newHistogram$default$2() {
        return KafkaMetricsGroup.newHistogram$default$2$(this);
    }

    @Override
    public Map<String, String> newHistogram$default$3() {
        return KafkaMetricsGroup.newHistogram$default$3$(this);
    }

    @Override
    public Timer newTimer(String name, TimeUnit durationUnit, TimeUnit rateUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newTimer$(this, name, durationUnit, rateUnit, tags);
    }

    @Override
    public Map<String, String> newTimer$default$4() {
        return KafkaMetricsGroup.newTimer$default$4$(this);
    }

    @Override
    public void removeMetric(String name, Map<String, String> tags) {
        KafkaMetricsGroup.removeMetric$(this, name, tags);
    }

    @Override
    public Map<String, String> removeMetric$default$2() {
        return KafkaMetricsGroup.removeMetric$default$2$(this);
    }

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public MirrorMaker.MirrorMakerProducer producer() {
        return this.producer;
    }

    public void producer_$eq(MirrorMaker.MirrorMakerProducer x$1) {
        this.producer = x$1;
    }

    private Seq<MirrorMaker.MirrorMakerThread> kafka$tools$MirrorMaker$$mirrorMakerThreads() {
        return this.kafka$tools$MirrorMaker$$mirrorMakerThreads;
    }

    public void kafka$tools$MirrorMaker$$mirrorMakerThreads_$eq(Seq<MirrorMaker.MirrorMakerThread> x$1) {
        this.kafka$tools$MirrorMaker$$mirrorMakerThreads = x$1;
    }

    public AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown() {
        return this.kafka$tools$MirrorMaker$$isShuttingDown;
    }

    public AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages() {
        return this.kafka$tools$MirrorMaker$$numDroppedMessages;
    }

    public MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler() {
        return this.kafka$tools$MirrorMaker$$messageHandler;
    }

    public void kafka$tools$MirrorMaker$$messageHandler_$eq(MirrorMaker.MirrorMakerMessageHandler x$1) {
        this.kafka$tools$MirrorMaker$$messageHandler = x$1;
    }

    public int kafka$tools$MirrorMaker$$offsetCommitIntervalMs() {
        return this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    }

    public void kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq(int x$1) {
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$abortOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$abortOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(boolean x$1) {
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$exitingOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$exitingOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$exitingOnSendFailure_$eq(boolean x$1) {
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = x$1;
    }

    private long lastSuccessfulCommitTime() {
        return this.lastSuccessfulCommitTime;
    }

    private void lastSuccessfulCommitTime_$eq(long x$1) {
        this.lastSuccessfulCommitTime = x$1;
    }

    private Time time() {
        return this.time;
    }

    public void main(String[] args) {
        if (this.logger().underlying().isWarnEnabled()) {
            this.logger().underlying().warn(this.msgWithLogIdent(MirrorMaker$.$anonfun$main$1()));
        }
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$main$2()));
        }
        try {
            MirrorMaker.MirrorMakerOptions opts = new MirrorMaker.MirrorMakerOptions(args);
            CommandLineUtils$.MODULE$.printHelpAndExitIfNeeded(opts, "This tool helps to continuously copy data between two Kafka clusters.");
            opts.checkArgs();
        }
        catch (Throwable throwable) {
            if (throwable instanceof ControlThrowable) {
                throw (Throwable)((ControlThrowable)throwable);
            }
            if (throwable != null) {
                if (this.logger().underlying().isErrorEnabled()) {
                    this.logger().underlying().error(this.msgWithLogIdent(MirrorMaker$.$anonfun$main$3()), throwable);
                }
            }
            throw null;
        }
        this.kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
            x$1.start();
            return BoxedUnit.UNIT;
        });
        this.kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
            x$2.awaitShutdown();
            return BoxedUnit.UNIT;
        });
    }

    public Seq<MirrorMaker.ConsumerWrapper> createConsumers(int numStreams, Properties consumerConfigProps, Option<ConsumerRebalanceListener> customRebalanceListener, Option<String> include) {
        this.kafka$tools$MirrorMaker$$maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false");
        consumerConfigProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfigProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        String groupIdString = consumerConfigProps.getProperty("group.id");
        IndexedSeq consumers = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1 & Serializable & scala.Serializable)i -> MirrorMaker$.$anonfun$createConsumers$1(consumerConfigProps, groupIdString, BoxesRunTime.unboxToInt((Object)i)), IndexedSeq$.MODULE$.canBuildFrom());
        include.getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            throw new IllegalArgumentException("include list cannot be empty");
        });
        return (Seq)consumers.map((Function1 & Serializable & scala.Serializable)consumer -> new MirrorMaker.ConsumerWrapper((Consumer<byte[], byte[]>)consumer, customRebalanceListener, include), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public void commitOffsets(MirrorMaker.ConsumerWrapper consumerWrapper) {
        if (!this.kafka$tools$MirrorMaker$$exitingOnSendFailure()) {
            IntRef retry = IntRef.create((int)0);
            boolean retryNeeded = true;
            while (retryNeeded) {
                if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace(this.msgWithLogIdent(MirrorMaker$.$anonfun$commitOffsets$1()));
                }
                try {
                    consumerWrapper.commit();
                    this.lastSuccessfulCommitTime_$eq(this.time().milliseconds());
                    retryNeeded = false;
                }
                catch (WakeupException e) {
                    this.commitOffsets(consumerWrapper);
                    throw e;
                }
                catch (TimeoutException timeoutException) {
                    Try try_ = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> consumerWrapper.consumer().listTopics());
                    if (try_ instanceof Success) {
                        java.util.Map visibleTopics = (java.util.Map)((Success)try_).value();
                        consumerWrapper.offsets().$minus$minus$eq((TraversableOnce)consumerWrapper.offsets().keySet().filter((Function1 & Serializable & scala.Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)MirrorMaker$.$anonfun$commitOffsets$3(visibleTopics, tp))));
                    } else if (try_ instanceof Failure) {
                        Throwable e = ((Failure)try_).exception();
                        if (this.logger().underlying().isWarnEnabled()) {
                            this.logger().underlying().warn(this.msgWithLogIdent(MirrorMaker$.$anonfun$commitOffsets$4()), e);
                        }
                    } else {
                        throw new MatchError((Object)try_);
                    }
                    ++retry.elem;
                    if (this.logger().underlying().isWarnEnabled()) {
                        this.logger().underlying().warn(this.msgWithLogIdent(MirrorMaker$.$anonfun$commitOffsets$6(retry)));
                    }
                    Thread.sleep(100L);
                }
                catch (CommitFailedException commitFailedException) {
                    retryNeeded = false;
                    if (!this.logger().underlying().isWarnEnabled()) continue;
                    this.logger().underlying().warn(this.msgWithLogIdent(MirrorMaker$.$anonfun$commitOffsets$7()));
                }
            }
        } else if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$commitOffsets$8()));
            return;
        }
    }

    public void cleanShutdown() {
        if (this.kafka$tools$MirrorMaker$$isShuttingDown().compareAndSet(false, true)) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$cleanShutdown$1()));
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$cleanShutdown$2()));
            }
            if (this.kafka$tools$MirrorMaker$$mirrorMakerThreads() != null) {
                this.kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
                    x$3.shutdown();
                    return BoxedUnit.UNIT;
                });
                this.kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
                    x$4.awaitShutdown();
                    return BoxedUnit.UNIT;
                });
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$cleanShutdown$5()));
            }
            this.producer().close();
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$cleanShutdown$6()));
                return;
            }
        }
    }

    public void kafka$tools$MirrorMaker$$maybeSetDefaultProperty(Properties properties, String propertyName, String defaultValue) {
        String propertyValue = properties.getProperty(propertyName);
        properties.setProperty(propertyName, (String)Option$.MODULE$.apply((Object)propertyValue).getOrElse((Function0 & Serializable & scala.Serializable)() -> defaultValue));
        String string = properties.getProperty(propertyName);
        if ((string == null ? defaultValue != null : !string.equals(defaultValue)) && this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(this.msgWithLogIdent(MirrorMaker$.$anonfun$maybeSetDefaultProperty$2(propertyName, propertyValue)));
            return;
        }
    }

    public static final /* synthetic */ int kafka$tools$MirrorMaker$$$anonfun$new$1() {
        return MODULE$.kafka$tools$MirrorMaker$$numDroppedMessages().get();
    }

    public static final /* synthetic */ String $anonfun$main$1() {
        return "This tool is deprecated and may be removed in a future major release.";
    }

    public static final /* synthetic */ String $anonfun$main$2() {
        return "Starting mirror maker";
    }

    public static final /* synthetic */ String $anonfun$main$3() {
        return "Exception when starting mirror maker.";
    }

    public static final /* synthetic */ KafkaConsumer $anonfun$createConsumers$1(Properties consumerConfigProps$1, String groupIdString$1, int i) {
        consumerConfigProps$1.setProperty("client.id", new StringBuilder(1).append(groupIdString$1).append("-").append(Integer.toString(i)).toString());
        return new KafkaConsumer(consumerConfigProps$1);
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$1() {
        return "Committing offsets.";
    }

    public static final /* synthetic */ boolean $anonfun$commitOffsets$3(java.util.Map visibleTopics$1, TopicPartition tp) {
        return !visibleTopics$1.containsKey(tp.topic());
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$4() {
        return "Failed to list all authorized topics after committing offsets timed out: ";
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$6(IntRef retry$1) {
        return new StringBuilder(263).append("Failed to commit offsets because the offset commit request processing can not be completed in time. ").append("If you see this regularly, it could indicate that you need to increase the consumer's ").append("default.api.timeout.ms").append(" ").append("Last successful offset commit timestamp=").append(MODULE$.lastSuccessfulCommitTime()).append(", retry count=").append(retry$1.elem).toString();
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$7() {
        return new StringBuilder(303).append("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to another instance. If you see this regularly, it could indicate that you need to either increase ").append("the consumer's ").append("session.timeout.ms").append(" or reduce the number of records ").append("handled on each iteration with ").append("max.poll.records").toString();
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$8() {
        return "Exiting on send failure, skip committing offsets.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$1() {
        return "Start clean shutdown.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$2() {
        return "Shutting down consumer threads.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$5() {
        return "Closing producer.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$6() {
        return "Kafka mirror maker shutdown successfully";
    }

    public static final /* synthetic */ String $anonfun$maybeSetDefaultProperty$2(String propertyName$1, String propertyValue$1) {
        return new StringOps(Predef$.MODULE$.augmentString("Property %s is overridden to %s - data loss or message reordering is possible.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{propertyName$1, propertyValue$1}));
    }

    private MirrorMaker$() {
        MODULE$ = this;
        Logging.$init$(this);
        KafkaMetricsGroup.$init$(this);
        this.producer = null;
        this.kafka$tools$MirrorMaker$$mirrorMakerThreads = null;
        this.kafka$tools$MirrorMaker$$isShuttingDown = new AtomicBoolean(false);
        this.kafka$tools$MirrorMaker$$numDroppedMessages = new AtomicInteger(0);
        this.kafka$tools$MirrorMaker$$messageHandler = null;
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = 0;
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = true;
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = false;
        this.lastSuccessfulCommitTime = -1L;
        this.time = Time.SYSTEM;
        this.newGauge("MirrorMaker-numDroppedMessages", new Gauge<Object>(){

            public final int value() {
                return MirrorMaker$.kafka$tools$MirrorMaker$$$anonfun$new$1();
            }
        }, this.newGauge$default$3());
    }
}

