/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.jdbc.stringbased;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.transaction.Transaction;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.io.ByteBuffer;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.commons.persistence.Store;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.executors.ExecutorAllCompletionService;
import org.infinispan.filter.KeyFilter;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.marshall.core.MarshalledEntryFactory;
import org.infinispan.metadata.InternalMetadata;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.TaskContextImpl;
import org.infinispan.persistence.jdbc.JdbcUtil;
import org.infinispan.persistence.jdbc.configuration.JdbcStringBasedStoreConfiguration;
import org.infinispan.persistence.jdbc.connectionfactory.ConnectionFactory;
import org.infinispan.persistence.jdbc.connectionfactory.ManagedConnectionFactory;
import org.infinispan.persistence.jdbc.logging.Log;
import org.infinispan.persistence.jdbc.table.management.TableManager;
import org.infinispan.persistence.jdbc.table.management.TableManagerFactory;
import org.infinispan.persistence.keymappers.Key2StringMapper;
import org.infinispan.persistence.keymappers.TwoWayKey2StringMapper;
import org.infinispan.persistence.keymappers.UnsupportedKeyTypeException;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.persistence.spi.AdvancedCacheWriter;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.spi.TransactionalCacheWriter;
import org.infinispan.persistence.support.BatchModification;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.LogFactory;

@Store(shared=true)
@ConfiguredBy(value=JdbcStringBasedStoreConfiguration.class)
public class JdbcStringBasedStore<K, V>
implements AdvancedLoadWriteStore<K, V>,
TransactionalCacheWriter<K, V> {
    private static final Log log = (Log)LogFactory.getLog(JdbcStringBasedStore.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private final Map<Transaction, Connection> transactionConnectionMap = new ConcurrentHashMap<Transaction, Connection>();
    private JdbcStringBasedStoreConfiguration configuration;
    private GlobalConfiguration globalConfiguration;
    private Key2StringMapper key2StringMapper;
    private String cacheName;
    private ConnectionFactory connectionFactory;
    private MarshalledEntryFactory marshalledEntryFactory;
    private StreamingMarshaller marshaller;
    private TableManager tableManager;
    private TimeService timeService;
    private boolean isDistributedCache;

    public void init(InitializationContext ctx) {
        this.configuration = (JdbcStringBasedStoreConfiguration)ctx.getConfiguration();
        this.cacheName = ctx.getCache().getName();
        this.globalConfiguration = ctx.getCache().getCacheManager().getCacheManagerConfiguration();
        this.marshalledEntryFactory = ctx.getMarshalledEntryFactory();
        this.marshaller = ctx.getMarshaller();
        this.timeService = ctx.getTimeService();
        this.isDistributedCache = ctx.getCache().getCacheConfiguration() != null && ctx.getCache().getCacheConfiguration().clustering().cacheMode().isDistributed();
    }

    public void start() {
        if (this.configuration.manageConnectionFactory()) {
            ConnectionFactory factory = ConnectionFactory.getConnectionFactory(this.configuration.connectionFactory().connectionFactoryClass());
            factory.start(this.configuration.connectionFactory(), factory.getClass().getClassLoader());
            this.initializeConnectionFactory(factory);
        }
        try {
            Object mapper = Util.loadClassStrict((String)this.configuration.key2StringMapper(), (ClassLoader)this.globalConfiguration.classLoader()).newInstance();
            if (mapper instanceof Key2StringMapper) {
                this.key2StringMapper = (Key2StringMapper)mapper;
            }
        }
        catch (Exception e) {
            log.errorf("Trying to instantiate %s, however it failed due to %s", this.configuration.key2StringMapper(), e.getClass().getName());
            throw new IllegalStateException("This should not happen.", e);
        }
        if (trace) {
            log.tracef("Using key2StringMapper: %s", this.key2StringMapper.getClass().getName());
        }
        if (this.configuration.preload()) {
            this.enforceTwoWayMapper("preload");
        }
        if (this.isDistributedCache) {
            this.enforceTwoWayMapper("distribution/rehashing");
        }
    }

    public void stop() {
        Throwable cause = null;
        try {
            this.tableManager.stop();
            this.tableManager = null;
        }
        catch (Throwable t) {
            cause = t.getCause();
            if (cause == null) {
                cause = t;
            }
            log.debug("Exception while stopping", t);
        }
        try {
            if (this.configuration.connectionFactory() instanceof ManagedConnectionFactory) {
                log.tracef("Stopping mananged connection factory: %s", this.connectionFactory);
                this.connectionFactory.stop();
            }
        }
        catch (Throwable t) {
            if (cause == null) {
                cause = t;
            } else {
                t.addSuppressed(cause);
            }
            log.debug("Exception while stopping", t);
        }
        if (cause != null) {
            throw new PersistenceException("Exceptions occurred while stopping store", cause);
        }
    }

    void initializeConnectionFactory(ConnectionFactory connectionFactory) throws PersistenceException {
        this.connectionFactory = connectionFactory;
        this.tableManager = this.getTableManager();
        this.tableManager.setCacheName(this.cacheName);
        this.tableManager.start();
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(MarshalledEntry entry) {
        Connection connection = null;
        String keyStr = this.key2Str(entry.getKey());
        try {
            connection = this.connectionFactory.getConnection();
            this.write(entry, connection, keyStr);
        }
        catch (SQLException ex) {
            log.sqlFailureStoringKey(keyStr, ex);
            throw new PersistenceException(String.format("Error while storing string key to database; key: '%s'", keyStr), (Throwable)ex);
        }
        catch (InterruptedException e) {
            if (trace) {
                log.trace("Interrupted while marshalling to store");
            }
            Thread.currentThread().interrupt();
        }
        finally {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    private void write(MarshalledEntry entry, Connection connection) throws SQLException, InterruptedException {
        this.write(entry, connection, this.key2Str(entry.getKey()));
    }

    private void write(MarshalledEntry entry, Connection connection, String keyStr) throws SQLException, InterruptedException {
        if (this.tableManager.isUpsertSupported()) {
            this.executeUpsert(connection, entry, keyStr);
        } else {
            this.executeLegacyUpdate(connection, entry, keyStr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeUpsert(Connection connection, MarshalledEntry entry, String keyStr) throws InterruptedException, SQLException {
        PreparedStatement ps = null;
        String sql = this.tableManager.getUpsertRowSql();
        if (trace) {
            log.tracef("Running sql '%s'. Key string is '%s'", sql, keyStr);
        }
        try {
            ps = connection.prepareStatement(sql);
            this.prepareUpdateStatement(entry, keyStr, ps);
            ps.executeUpdate();
        }
        finally {
            JdbcUtil.safeClose(ps);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeLegacyUpdate(Connection connection, MarshalledEntry entry, String keyStr) throws InterruptedException, SQLException {
        String sql = this.tableManager.getSelectIdRowSql();
        if (trace) {
            log.tracef("Running sql '%s'. Key string is '%s'", sql, keyStr);
        }
        PreparedStatement ps = null;
        try {
            ps = connection.prepareStatement(sql);
            ps.setString(1, keyStr);
            ResultSet rs = ps.executeQuery();
            sql = rs.next() ? this.tableManager.getUpdateRowSql() : this.tableManager.getInsertRowSql();
            JdbcUtil.safeClose(rs);
            JdbcUtil.safeClose(ps);
            if (trace) {
                log.tracef("Running sql '%s'. Key string is '%s'", sql, keyStr);
            }
            ps = connection.prepareStatement(sql);
            this.prepareUpdateStatement(entry, keyStr, ps);
            ps.executeUpdate();
        }
        finally {
            JdbcUtil.safeClose(ps);
        }
    }

    public void writeBatch(Iterable<MarshalledEntry<? extends K, ? extends V>> marshalledEntries) {
        if (!this.tableManager.isUpsertSupported()) {
            marshalledEntries.forEach(this::write);
            return;
        }
        Connection connection = null;
        try {
            connection = this.connectionFactory.getConnection();
            try (PreparedStatement upsertBatch = connection.prepareStatement(this.tableManager.getUpsertRowSql());){
                for (MarshalledEntry<K, V> marshalledEntry : marshalledEntries) {
                    String keyStr = this.key2Str(marshalledEntry.getKey());
                    this.prepareUpdateStatement(marshalledEntry, keyStr, upsertBatch);
                    upsertBatch.addBatch();
                }
                upsertBatch.executeBatch();
            }
        }
        catch (InterruptedException | SQLException e) {
            throw log.sqlFailureWritingBatch(e);
        }
        finally {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    public void deleteBatch(Iterable<Object> keys) {
        Connection connection = null;
        try {
            connection = this.connectionFactory.getConnection();
            try (PreparedStatement deleteBatch = connection.prepareStatement(this.tableManager.getDeleteRowSql());){
                for (Object key : keys) {
                    String keyStr = this.key2Str(key);
                    deleteBatch.setString(1, keyStr);
                    deleteBatch.addBatch();
                }
                deleteBatch.executeBatch();
            }
        }
        catch (SQLException e) {
            throw log.sqlFailureDeletingBatch(keys, e);
        }
        finally {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    public MarshalledEntry load(Object key) {
        String lockingKey = this.key2Str(key);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        MarshalledEntry storedValue = null;
        try {
            String sql = this.tableManager.getSelectRowSql();
            conn = this.connectionFactory.getConnection();
            ps = conn.prepareStatement(sql);
            ps.setString(1, lockingKey);
            rs = ps.executeQuery();
            if (rs.next()) {
                InputStream inputStream = rs.getBinaryStream(2);
                KeyValuePair icv = (KeyValuePair)this.unmarshall(inputStream);
                storedValue = this.marshalledEntryFactory.newMarshalledEntry(key, (ByteBuffer)icv.getKey(), (ByteBuffer)icv.getValue());
            }
        }
        catch (SQLException e) {
            try {
                log.sqlFailureReadingKey(key, lockingKey, e);
                throw new PersistenceException(String.format("SQL error while fetching stored entry with key: %s, lockingKey: %s", key, lockingKey), (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
        if (storedValue != null && storedValue.getMetadata() != null && storedValue.getMetadata().isExpired(this.timeService.wallClockTime())) {
            return null;
        }
        return storedValue;
    }

    public void clear() {
        Connection conn = null;
        Statement statement = null;
        try {
            String sql = this.tableManager.getDeleteAllRowsSql();
            conn = this.connectionFactory.getConnection();
            statement = conn.createStatement();
            int result = statement.executeUpdate(sql);
            if (log.isTraceEnabled()) {
                log.tracef("Successfully removed %d rows.", result);
            }
        }
        catch (SQLException ex) {
            try {
                log.failedClearingJdbcCacheStore(ex);
                throw new PersistenceException("Failed clearing cache store", (Throwable)ex);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(statement);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(statement);
        this.connectionFactory.releaseConnection(conn);
    }

    public boolean delete(Object key) {
        boolean bl;
        Connection connection = null;
        PreparedStatement ps = null;
        String keyStr = this.key2Str(key);
        try {
            String sql = this.tableManager.getDeleteRowSql();
            if (trace) {
                log.tracef("Running sql '%s' on %s", sql, keyStr);
            }
            connection = this.connectionFactory.getConnection();
            ps = connection.prepareStatement(sql);
            ps.setString(1, keyStr);
            bl = ps.executeUpdate() == 1;
        }
        catch (SQLException ex) {
            try {
                log.sqlFailureRemovingKeys(ex);
                throw new PersistenceException("Error while removing string keys from database", (Throwable)ex);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(connection);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(connection);
        return bl;
    }

    public void purge(Executor executor, AdvancedCacheWriter.PurgeListener purgeListener) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            String sql = this.tableManager.getSelectOnlyExpiredRowsSql();
            conn = this.connectionFactory.getConnection();
            ps = conn.prepareStatement(sql);
            ps.setLong(1, this.timeService.wallClockTime());
            rs = ps.executeQuery();
            try (PreparedStatement batchDelete = conn.prepareStatement(this.tableManager.getDeleteRowSql());){
                int affectedRows = 0;
                boolean twoWayMapperExists = this.key2StringMapper instanceof TwoWayKey2StringMapper;
                while (rs.next()) {
                    ++affectedRows;
                    String keyStr = rs.getString(2);
                    batchDelete.setString(1, keyStr);
                    batchDelete.addBatch();
                    if (!twoWayMapperExists || purgeListener == null) continue;
                    Object key = ((TwoWayKey2StringMapper)this.key2StringMapper).getKeyMapping(keyStr);
                    purgeListener.entryPurged(key);
                }
                if (!twoWayMapperExists) {
                    log.twoWayKey2StringMapperIsMissing(TwoWayKey2StringMapper.class.getSimpleName());
                }
                if (affectedRows > 0) {
                    int[] result = batchDelete.executeBatch();
                    if (trace) {
                        log.tracef("Successfully purged %d rows.", result.length);
                    }
                }
            }
        }
        catch (SQLException ex) {
            try {
                log.failedClearingJdbcCacheStore(ex);
                throw new PersistenceException("Failed clearing string based JDBC store", (Throwable)ex);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
    }

    public boolean contains(Object key) {
        return this.load(key) != null;
    }

    public void process(KeyFilter filter, AdvancedCacheLoader.CacheLoaderTask task, Executor executor, boolean fetchValue, boolean fetchMetadata) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            String sql = this.tableManager.getLoadNonExpiredAllRowsSql();
            if (trace) {
                log.tracef("Running sql %s", sql);
            }
            conn = this.connectionFactory.getConnection();
            ps = conn.prepareStatement(sql, 1003, 1007);
            ps.setLong(1, this.timeService.wallClockTime());
            ps.setFetchSize(this.tableManager.getFetchSize());
            rs = ps.executeQuery();
            TaskContextImpl taskContext = new TaskContextImpl();
            ExecutorAllCompletionService ecs = new ExecutorAllCompletionService(executor);
            while (rs.next()) {
                String keyStr = rs.getString(2);
                Object key = ((TwoWayKey2StringMapper)this.key2StringMapper).getKeyMapping(keyStr);
                if (taskContext.isStopped()) break;
                if (filter != null && !filter.accept(key)) continue;
                InputStream inputStream = rs.getBinaryStream(1);
                ecs.submit(() -> this.lambda$process$0((AdvancedCacheLoader.TaskContext)taskContext, fetchValue, fetchMetadata, inputStream, key, task));
            }
            ecs.waitUntilAllCompleted();
            if (ecs.isExceptionThrown()) {
                throw new PersistenceException("Execution exception!", (Throwable)ecs.getFirstException());
            }
        }
        catch (SQLException e) {
            try {
                log.sqlFailureFetchingAllStoredEntries(e);
                throw new PersistenceException("SQL error while fetching all StoredEntries", (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
    }

    public void prepareWithModifications(Transaction transaction, BatchModification batchModification) throws PersistenceException {
        try {
            Connection connection = this.getTxConnection(transaction);
            connection.setAutoCommit(false);
            boolean upsertSupported = this.tableManager.isUpsertSupported();
            try (PreparedStatement upsertBatch = upsertSupported ? connection.prepareStatement(this.tableManager.getUpsertRowSql()) : null;
                 PreparedStatement deleteBatch = connection.prepareStatement(this.tableManager.getDeleteRowSql());){
                String keyStr;
                for (MarshalledEntry entry : batchModification.getMarshalledEntries()) {
                    if (upsertSupported) {
                        keyStr = this.key2Str(entry.getKey());
                        this.prepareUpdateStatement(entry, keyStr, upsertBatch);
                        upsertBatch.addBatch();
                        continue;
                    }
                    this.write(entry, connection);
                }
                for (Object key : batchModification.getKeysToRemove()) {
                    keyStr = this.key2Str(key);
                    deleteBatch.setString(1, keyStr);
                    deleteBatch.addBatch();
                }
                if (upsertSupported && !batchModification.getMarshalledEntries().isEmpty()) {
                    upsertBatch.executeBatch();
                }
                if (!batchModification.getKeysToRemove().isEmpty()) {
                    deleteBatch.executeUpdate();
                }
            }
        }
        catch (InterruptedException | SQLException e) {
            throw log.prepareTxFailure(e);
        }
    }

    public void commit(Transaction tx) {
        try {
            Connection connection = this.getTxConnection(tx);
            connection.commit();
        }
        catch (SQLException e) {
            log.sqlFailureTxCommit(e);
            throw new PersistenceException(String.format("Error during commit of JDBC transaction (%s)", tx), (Throwable)e);
        }
        finally {
            this.destroyTxConnection(tx);
        }
    }

    public void rollback(Transaction tx) {
        try {
            Connection connection = this.getTxConnection(tx);
            connection.rollback();
        }
        catch (SQLException e) {
            log.sqlFailureTxRollback(e);
            throw new PersistenceException(String.format("Error during rollback of JDBC transaction (%s)", tx), (Throwable)e);
        }
        finally {
            this.destroyTxConnection(tx);
        }
    }

    private Connection getTxConnection(Transaction tx) {
        Connection connection = this.transactionConnectionMap.get(tx);
        if (connection == null) {
            connection = this.connectionFactory.getConnection();
            this.transactionConnectionMap.put(tx, connection);
        }
        return connection;
    }

    private void destroyTxConnection(Transaction tx) {
        Connection connection = this.transactionConnectionMap.remove(tx);
        if (connection != null) {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    public int size() {
        int n;
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = this.connectionFactory.getConnection();
            String sql = this.tableManager.getCountRowsSql();
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            rs.next();
            n = rs.getInt(1);
        }
        catch (SQLException e) {
            try {
                log.sqlFailureIntegratingState(e);
                throw new PersistenceException("SQL failure while integrating state into store", (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
        return n;
    }

    private void prepareUpdateStatement(MarshalledEntry entry, String key, PreparedStatement ps) throws InterruptedException, SQLException {
        ByteBuffer byteBuffer = this.marshall(new KeyValuePair((Object)entry.getValueBytes(), (Object)entry.getMetadataBytes()));
        ps.setBinaryStream(1, (InputStream)new ByteArrayInputStream(byteBuffer.getBuf(), byteBuffer.getOffset(), byteBuffer.getLength()), byteBuffer.getLength());
        ps.setLong(2, PersistenceUtil.getExpiryTime((InternalMetadata)entry.getMetadata()));
        ps.setString(3, key);
    }

    private String key2Str(Object key) throws PersistenceException {
        if (!this.key2StringMapper.isSupportedType(key.getClass())) {
            throw new UnsupportedKeyTypeException(key);
        }
        String keyStr = this.key2StringMapper.getStringMapping(key);
        return this.tableManager.isStringEncodingRequired() ? this.tableManager.encodeString(keyStr) : keyStr;
    }

    public TableManager getTableManager() {
        if (this.tableManager == null) {
            this.tableManager = TableManagerFactory.getManager(this.connectionFactory, this.configuration);
        }
        return this.tableManager;
    }

    private void enforceTwoWayMapper(String where) throws PersistenceException {
        if (!(this.key2StringMapper instanceof TwoWayKey2StringMapper)) {
            log.invalidKey2StringMapper(where, this.key2StringMapper.getClass().getName());
            throw new PersistenceException(String.format("Invalid key to string mapper : %s", this.key2StringMapper.getClass().getName()));
        }
    }

    private ByteBuffer marshall(Object obj) throws PersistenceException, InterruptedException {
        try {
            return this.marshaller.objectToBuffer(obj);
        }
        catch (IOException e) {
            log.errorMarshallingObject(e, obj);
            throw new PersistenceException("I/O failure while marshalling object: " + obj, (Throwable)e);
        }
    }

    private <T> T unmarshall(InputStream inputStream) throws PersistenceException {
        try {
            return (T)this.marshaller.objectFromInputStream(inputStream);
        }
        catch (IOException e) {
            log.ioErrorUnmarshalling(e);
            throw new PersistenceException("I/O error while unmarshalling from stream", (Throwable)e);
        }
        catch (ClassNotFoundException e) {
            log.unexpectedClassNotFoundException(e);
            throw new PersistenceException("*UNEXPECTED* ClassNotFoundException. This should not happen as Bucket class exists", (Throwable)e);
        }
    }

    private /* synthetic */ Void lambda$process$0(AdvancedCacheLoader.TaskContext taskContext, boolean fetchValue, boolean fetchMetadata, InputStream inputStream, Object key, AdvancedCacheLoader.CacheLoaderTask task) throws Exception {
        if (!taskContext.isStopped()) {
            MarshalledEntry entry;
            if (fetchValue || fetchMetadata) {
                KeyValuePair kvp = (KeyValuePair)this.unmarshall(inputStream);
                entry = this.marshalledEntryFactory.newMarshalledEntry(key, fetchValue ? (ByteBuffer)kvp.getKey() : null, fetchMetadata ? (ByteBuffer)kvp.getValue() : null);
            } else {
                entry = this.marshalledEntryFactory.newMarshalledEntry(key, null, null);
            }
            task.processEntry(entry, taskContext);
        }
        return null;
    }
}

