/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.storage;

import io.streamthoughts.kafka.connect.filepulse.storage.Callback;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLogFactory;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSerde;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStateBackingStore<T>
implements StateBackingStore<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStateBackingStore.class);
    private static final Duration DEFAULT_READ_TO_END_TIMEOUT = Duration.ofSeconds(30L);
    private static final String GROUP_STATE_SEPARATOR = ".";
    private final KafkaBasedLog<String, byte[]> kafkaLog;
    private final Object lock = new Object();
    private final String groupId;
    private final AtomicLong offset = new AtomicLong(-1L);
    private final Map<String, T> states = new HashMap<String, T>();
    private final StateSerde<T> serde;
    private final String keyPrefix;
    private final boolean consumerEnabled;
    private volatile Status status = Status.CREATED;
    private StateBackingStore.UpdateListener<T> updateListener;

    public KafkaStateBackingStore(String topic, String keyPrefix, String groupId, Map<String, ?> producerProps, Map<String, ?> consumerProps, StateSerde<T> serde, boolean consumerEnabled) {
        KafkaBasedLogFactory factory = new KafkaBasedLogFactory(producerProps, consumerProps);
        this.kafkaLog = factory.make(topic, new ConsumeCallback());
        this.groupId = KafkaStateBackingStore.sanitizeGroupId(groupId);
        this.serde = serde;
        this.keyPrefix = keyPrefix;
        this.consumerEnabled = consumerEnabled;
    }

    private static String sanitizeGroupId(String groupId) {
        return groupId.replaceAll("\\.", "-");
    }

    Status getState() {
        return this.status;
    }

    @Override
    public synchronized void start() {
        if (this.isStarted()) {
            throw new IllegalStateException("Cannot init again.");
        }
        LOG.info("Starting {}", (Object)this.getBackingStoreName());
        this.kafkaLog.start(this.consumerEnabled);
        this.status = Status.STARTED;
        LOG.info("Started {}", (Object)this.getBackingStoreName());
    }

    @Override
    public boolean isStarted() {
        return this.getState().equals((Object)Status.STARTED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        KafkaStateBackingStore kafkaStateBackingStore = this;
        synchronized (kafkaStateBackingStore) {
            LOG.info("Closing {}", (Object)this.getBackingStoreName());
            this.status = Status.PENDING_SHUTDOWN;
            this.kafkaLog.flush();
            this.kafkaLog.stop();
            this.status = Status.SHUTDOWN;
            LOG.info("Closed {}", (Object)this.getBackingStoreName());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StateSnapshot<T> snapshot() {
        Object object = this.lock;
        synchronized (object) {
            return new StateSnapshot<T>(this.offset.get(), Collections.unmodifiableMap(this.states));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean contains(String name) {
        Object object = this.lock;
        synchronized (object) {
            return this.states.containsKey(name);
        }
    }

    @Override
    public void putAsync(String name, T state) {
        this.put(name, state, false);
    }

    @Override
    public void put(String name, T state) {
        this.put(name, state, true);
    }

    private void put(String name, T state, boolean sync) {
        this.checkStates();
        try {
            this.safeSend(name, this.serde.serialize(state));
        }
        catch (Exception e) {
            LOG.error("Failed to write state to Kafka: ", (Throwable)e);
            throw new RuntimeException("Error writing state to Kafka", e);
        }
        this.mayRefreshState(sync);
    }

    @Override
    public void removeAsync(String name) {
        this.remove(name, false);
    }

    private void remove(String name, boolean sync) {
        this.checkStates();
        LOG.debug("Removing state for name {}", (Object)name);
        try {
            this.safeSend(name, null);
        }
        catch (Exception e) {
            LOG.error("Failed to remove state from Kafka: ", (Throwable)e);
            throw new RuntimeException("Error removing state from Kafka", e);
        }
        this.mayRefreshState(sync);
    }

    private void mayRefreshState(boolean refresh) {
        try {
            if (refresh) {
                this.refresh(DEFAULT_READ_TO_END_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
        }
        catch (TimeoutException e) {
            LOG.error("Failed to synchronize state from Kafka: TimeoutException");
        }
    }

    private void safeSend(final String key, final byte[] value) {
        this.kafkaLog.send(this.newRecordKey(this.groupId, key), value, new org.apache.kafka.clients.producer.Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    return;
                }
                if (exception instanceof RetriableException) {
                    if (value == null) {
                        KafkaStateBackingStore.this.kafkaLog.send(key, null, this);
                    }
                } else {
                    LOG.error("Failed to write state update", (Throwable)exception);
                }
            }
        });
    }

    @Override
    public void remove(String name) {
        this.remove(name, true);
    }

    @Override
    public void refresh(long timeout, TimeUnit unit) throws TimeoutException {
        this.checkStates();
        if (!this.consumerEnabled) {
            LOG.warn("This KafkaStateBackingStore is running in producer mode only. Refresh is ignored.");
            return;
        }
        try {
            this.kafkaLog.readToEnd().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Error trying to read to end of log", e);
        }
    }

    @Override
    public void setUpdateListener(StateBackingStore.UpdateListener<T> listener) {
        this.updateListener = listener;
    }

    private String getBackingStoreName() {
        return this.getClass().getSimpleName();
    }

    private synchronized void checkStates() {
        if (this.status == Status.SHUTDOWN || this.status == Status.PENDING_SHUTDOWN) {
            throw new IllegalStateException("Bad state " + this.getState().name());
        }
    }

    private String newRecordKey(String groupId, String stateName) {
        return this.keyPrefix + groupId + GROUP_STATE_SEPARATOR + stateName;
    }

    public class ConsumeCallback
    implements Callback<ConsumerRecord<String, byte[]>> {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            if (error != null) {
                LOG.error("Unexpected in consumer callback for KafkaStateBackingStore: ", error);
                return;
            }
            KafkaStateBackingStore.this.offset.set(record.offset() + 1L);
            byte[] value = (byte[])record.value();
            String key = (String)record.key();
            if (key != null && key.startsWith(KafkaStateBackingStore.this.keyPrefix)) {
                String[] groupAndState = key.substring(KafkaStateBackingStore.this.keyPrefix.length()).split("\\.", 2);
                String recordGroup = groupAndState[0];
                String stateName = groupAndState[1];
                if (recordGroup.equals(KafkaStateBackingStore.this.groupId)) {
                    boolean removed = false;
                    Object newState = null;
                    Object object = KafkaStateBackingStore.this.lock;
                    synchronized (object) {
                        if (value == null) {
                            LOG.debug("Removed state {} due to null configuration. This is usually intentional and does not indicate an issue.", (Object)stateName);
                            KafkaStateBackingStore.this.states.remove(stateName);
                            removed = true;
                        } else {
                            try {
                                newState = KafkaStateBackingStore.this.serde.deserialize(value);
                            }
                            catch (Exception e) {
                                LOG.error("Failed to read state : {}", (Object)stateName, (Object)e);
                                return;
                            }
                            LOG.debug("Updating state for name {} : {}", (Object)stateName, newState);
                            KafkaStateBackingStore.this.states.put(stateName, newState);
                        }
                    }
                    if (KafkaStateBackingStore.this.status == Status.STARTED && KafkaStateBackingStore.this.updateListener != null) {
                        if (removed) {
                            KafkaStateBackingStore.this.updateListener.onStateRemove(stateName);
                        } else {
                            KafkaStateBackingStore.this.updateListener.onStateUpdate(stateName, newState);
                        }
                    }
                } else {
                    LOG.trace("Discarding state update value - not belong to group {} : {}", (Object)KafkaStateBackingStore.this.groupId, (Object)key);
                }
            } else {
                LOG.warn("Discarding state update value with invalid key : {}", (Object)key);
            }
        }
    }

    public static enum Status {
        CREATED,
        STARTED,
        PENDING_SHUTDOWN,
        SHUTDOWN;

    }
}

