package com.hazelcast.map.impl.mapstore.writebehind;

import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.mapstore.MapStoreContext;
import com.hazelcast.map.impl.mapstore.writebehind.entry.DelayedEntry;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.util.Clock;
import com.hazelcast.util.CollectionUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:lib/hazelcast-3.12.jar:com/hazelcast/map/impl/mapstore/writebehind/StoreWorker.class */
public class StoreWorker implements Runnable {
    private final String mapName;
    private final MapServiceContext mapServiceContext;
    private final IPartitionService partitionService;
    private final ExecutionService executionService;
    private final WriteBehindProcessor writeBehindProcessor;
    private final long backupDelayMillis;
    private final long writeDelayMillis;
    private final int partitionCount;
    private long lastHighestStoreTime;
    private volatile boolean running;

    public StoreWorker(MapStoreContext mapStoreContext, WriteBehindProcessor writeBehindProcessor) {
        this.mapName = mapStoreContext.getMapName();
        this.mapServiceContext = mapStoreContext.getMapServiceContext();
        NodeEngine nodeEngine = this.mapServiceContext.getNodeEngine();
        this.partitionService = nodeEngine.getPartitionService();
        this.executionService = nodeEngine.getExecutionService();
        this.writeBehindProcessor = writeBehindProcessor;
        this.backupDelayMillis = getReplicaWaitTimeMillis();
        this.lastHighestStoreTime = Clock.currentTimeMillis();
        this.writeDelayMillis = TimeUnit.SECONDS.toMillis(getWriteDelaySeconds(mapStoreContext));
        this.partitionCount = this.partitionService.getPartitionCount();
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        schedule();
    }

    public synchronized void stop() {
        this.running = false;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            runInternal();
        } finally {
            if (this.running) {
                schedule();
            }
        }
    }

    private void schedule() {
        this.executionService.schedule(this, 1L, TimeUnit.SECONDS);
    }

    private void runInternal() {
        long calculateHighestStoreTime = calculateHighestStoreTime(this.lastHighestStoreTime, Clock.currentTimeMillis());
        long j = calculateHighestStoreTime - this.backupDelayMillis;
        this.lastHighestStoreTime = calculateHighestStoreTime;
        List<DelayedEntry> list = null;
        List<DelayedEntry> list2 = null;
        for (int i = 0; i < this.partitionCount && !Thread.currentThread().isInterrupted(); i++) {
            RecordStore recordStoreOrNull = getRecordStoreOrNull(this.mapName, i);
            if (hasEntryInWriteBehindQueue(recordStoreOrNull)) {
                if (isPartitionLocal(i)) {
                    list = initListIfNull(list, this.partitionCount);
                    selectEntriesToStore(recordStoreOrNull, list, calculateHighestStoreTime);
                } else {
                    list2 = initListIfNull(list2, this.partitionCount);
                    selectEntriesToStore(recordStoreOrNull, list2, j);
                }
            }
        }
        if (!CollectionUtil.isEmpty(list)) {
            Map<Integer, List<DelayedEntry>> process = this.writeBehindProcessor.process(list);
            removeFinishedStoreOperationsFromQueues(this.mapName, list);
            reAddFailedStoreOperationsToQueues(this.mapName, process);
        }
        if (!CollectionUtil.isEmpty(list2)) {
            doInBackup(list2);
        }
        notifyFlush();
    }

    private static List<DelayedEntry> initListIfNull(List<DelayedEntry> list, int i) {
        if (list == null) {
            list = new ArrayList(i);
        }
        return list;
    }

    private long calculateHighestStoreTime(long j, long j2) {
        return j2 >= j + this.writeDelayMillis ? j2 : j;
    }

    private boolean hasEntryInWriteBehindQueue(RecordStore recordStore) {
        return (recordStore == null || ((WriteBehindStore) recordStore.getMapDataStore()).getWriteBehindQueue().size() == 0) ? false : true;
    }

    private void notifyFlush() {
        for (int i = 0; i < this.partitionCount; i++) {
            RecordStore recordStoreOrNull = getRecordStoreOrNull(this.mapName, i);
            if (recordStoreOrNull != null) {
                ((WriteBehindStore) recordStoreOrNull.getMapDataStore()).notifyFlush();
            }
        }
    }

    private boolean isPartitionLocal(int i) {
        return this.partitionService.getPartition(i, false).isLocal();
    }

    private void selectEntriesToStore(RecordStore recordStore, List<DelayedEntry> list, long j) {
        filterWriteBehindQueue(j, getSequenceToFlush(recordStore), list, getWriteBehindQueue(recordStore));
    }

    private void filterWriteBehindQueue(final long j, final long j2, Collection<DelayedEntry> collection, WriteBehindQueue<DelayedEntry> writeBehindQueue) {
        if (j2 > 0) {
            writeBehindQueue.filter(new IPredicate<DelayedEntry>() { // from class: com.hazelcast.map.impl.mapstore.writebehind.StoreWorker.1
                @Override // com.hazelcast.map.impl.mapstore.writebehind.IPredicate
                public boolean test(DelayedEntry delayedEntry) {
                    return delayedEntry.getSequence() <= j2;
                }
            }, collection);
        } else {
            writeBehindQueue.filter(new IPredicate<DelayedEntry>() { // from class: com.hazelcast.map.impl.mapstore.writebehind.StoreWorker.2
                @Override // com.hazelcast.map.impl.mapstore.writebehind.IPredicate
                public boolean test(DelayedEntry delayedEntry) {
                    return delayedEntry.getStoreTime() <= j;
                }
            }, collection);
        }
    }

    private void removeFinishedStoreOperationsFromQueues(String str, List<DelayedEntry> list) {
        for (DelayedEntry delayedEntry : list) {
            RecordStore recordStoreOrNull = getRecordStoreOrNull(str, delayedEntry.getPartitionId());
            if (recordStoreOrNull != null) {
                getWriteBehindQueue(recordStoreOrNull).removeFirstOccurrence(delayedEntry);
            }
        }
    }

    private void reAddFailedStoreOperationsToQueues(String str, Map<Integer, List<DelayedEntry>> map) {
        RecordStore recordStoreOrNull;
        if (map.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<Integer, List<DelayedEntry>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Integer key = it.next().getKey();
            List<DelayedEntry> list = map.get(key);
            if (!CollectionUtil.isEmpty(list) && (recordStoreOrNull = getRecordStoreOrNull(str, key.intValue())) != null) {
                getWriteBehindQueue(recordStoreOrNull).addFirst(list);
            }
        }
    }

    private void doInBackup(List<DelayedEntry> list) {
        this.writeBehindProcessor.callBeforeStoreListeners(list);
        removeFinishedStoreOperationsFromQueues(this.mapName, list);
        this.writeBehindProcessor.callAfterStoreListeners(list);
    }

    private long getReplicaWaitTimeMillis() {
        return this.mapServiceContext.getNodeEngine().getProperties().getMillis(GroupProperty.MAP_REPLICA_SCHEDULED_TASK_DELAY_SECONDS);
    }

    private RecordStore getRecordStoreOrNull(String str, int i) {
        return this.mapServiceContext.getPartitionContainer(i).getExistingRecordStore(str);
    }

    private WriteBehindQueue<DelayedEntry> getWriteBehindQueue(RecordStore recordStore) {
        return ((WriteBehindStore) recordStore.getMapDataStore()).getWriteBehindQueue();
    }

    private long getSequenceToFlush(RecordStore recordStore) {
        return ((WriteBehindStore) recordStore.getMapDataStore()).getSequenceToFlush();
    }

    private static int getWriteDelaySeconds(MapStoreContext mapStoreContext) {
        return mapStoreContext.getMapStoreConfig().getWriteDelaySeconds();
    }

    public String toString() {
        return "StoreWorker{mapName='" + this.mapName + "'}";
    }
}
