package com.microsoft.azure.spring.integration.eventhub.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.eventhub.util.EventDataHelper;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/impl/EventHubCheckpointManager.class */
public class EventHubCheckpointManager {
    private static final Logger log = LoggerFactory.getLogger(EventHubCheckpointManager.class);
    private final CheckpointConfig checkpointConfig;
    private final ConcurrentHashMap<String, AtomicInteger> countByPartition = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, EventData> lastEventByPartition = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubCheckpointManager(CheckpointConfig checkpointConfig) {
        this.checkpointConfig = checkpointConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onMessage(PartitionContext partitionContext, EventData eventData) {
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.RECORD) {
            partitionContext.checkpoint(eventData).whenComplete((r8, th) -> {
                checkpointHandler(partitionContext, eventData, th);
            });
        } else if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.PARTITION_COUNT) {
            String partitionId = partitionContext.getPartitionId();
            this.countByPartition.computeIfAbsent(partitionId, str -> {
                return new AtomicInteger(0);
            });
            AtomicInteger atomicInteger = this.countByPartition.get(partitionId);
            if (atomicInteger.incrementAndGet() >= this.checkpointConfig.getCheckpointCount()) {
                partitionContext.checkpoint(eventData).whenComplete((r82, th2) -> {
                    checkpointHandler(partitionContext, eventData, th2);
                });
                atomicInteger.set(0);
            }
        }
        this.lastEventByPartition.put(partitionContext.getPartitionId(), eventData);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void completeBatch(PartitionContext partitionContext) {
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.BATCH) {
            EventData eventData = this.lastEventByPartition.get(partitionContext.getPartitionId());
            partitionContext.checkpoint().whenComplete((r8, th) -> {
                checkpointHandler(partitionContext, eventData, th);
            });
        }
    }

    private String buildCheckpointFailMessage(PartitionContext partitionContext, EventData eventData) {
        return String.format("Consumer group '%s' failed to checkpoint %s on partition %s", partitionContext.getConsumerGroupName(), EventDataHelper.toString(eventData), partitionContext.getPartitionId());
    }

    private String buildCheckpointSuccessMessage(PartitionContext partitionContext, EventData eventData) {
        return String.format("Consumer group '%s' checkpointed %s on partition %s in %s mode", partitionContext.getConsumerGroupName(), EventDataHelper.toString(eventData), partitionContext.getPartitionId(), this.checkpointConfig.getCheckpointMode());
    }

    private void checkpointHandler(PartitionContext partitionContext, EventData eventData, Throwable th) {
        if (th != null) {
            if (log.isWarnEnabled()) {
                log.warn(buildCheckpointFailMessage(partitionContext, eventData), th);
            }
        } else if (log.isDebugEnabled()) {
            log.debug(buildCheckpointSuccessMessage(partitionContext, eventData));
        }
    }
}
