package com.hazelcast.jet.impl.util;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.nio.Bits;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.PartitionAware;
import com.hazelcast.spi.impl.NodeEngine;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;

/* loaded from: input_file:lib/hazelcast-5.5.0.jar:com/hazelcast/jet/impl/util/AsyncSnapshotWriterImpl.class */
public class AsyncSnapshotWriterImpl implements AsyncSnapshotWriter {
    public static final int DEFAULT_CHUNK_SIZE = 131072;
    final int usableChunkCapacity;
    final byte[] serializedByteArrayHeader;
    final byte[] valueTerminator;
    final AtomicInteger numConcurrentAsyncOps;
    private final IPartitionService partitionService;
    private final CustomByteArrayOutputStream[] buffers;
    private final int[] partitionKeys;
    private int partitionSequence;
    private final ILogger logger;
    private final NodeEngine nodeEngine;
    private final boolean useBigEndian;
    private final SnapshotContext snapshotContext;
    private final String vertexName;
    private final int memberCount;
    private IMap<SnapshotDataKey, Object> currentMap;
    private long currentSnapshotId;
    private final AtomicReference<Throwable> firstError;
    private final AtomicInteger numActiveFlushes;
    private long totalKeys;
    private long totalChunks;
    private long totalPayloadBytes;
    private final BiConsumer<Object, Throwable> putResponseConsumer;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/hazelcast-5.5.0.jar:com/hazelcast/jet/impl/util/AsyncSnapshotWriterImpl$CustomByteArrayOutputStream.class */
    public static class CustomByteArrayOutputStream extends OutputStream {
        private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
        private byte[] data = EMPTY_BYTE_ARRAY;
        private int size;
        private int capacityLimit;

        CustomByteArrayOutputStream(int i) {
            this.capacityLimit = i;
        }

        @Override // java.io.OutputStream
        public void write(int i) {
            ensureCapacity(this.size + 1);
            this.data[this.size] = (byte) i;
            this.size++;
        }

        @Override // java.io.OutputStream
        public void write(@Nonnull byte[] bArr, int i, int i2) {
            if (i < 0 || i > bArr.length || i2 < 0 || (i + i2) - bArr.length > 0) {
                throw new IndexOutOfBoundsException("off=" + i + ", len=" + i2);
            }
            ensureCapacity(this.size + i2);
            System.arraycopy(bArr, i, this.data, this.size, i2);
            this.size += i2;
        }

        private void ensureCapacity(int i) {
            if (i - this.data.length > 0) {
                int length = this.data.length;
                do {
                    length = Math.max(1, length << 1);
                } while (length - i < 0);
                if (length - this.capacityLimit > 0) {
                    throw new IllegalStateException("buffer full");
                }
                this.data = Arrays.copyOf(this.data, length);
            }
        }

        void reset() {
            this.size = 0;
        }

        @Nonnull
        byte[] toByteArray() {
            return Arrays.copyOf(this.data, this.size);
        }

        int size() {
            return this.size;
        }
    }

    /* loaded from: input_file:lib/hazelcast-5.5.0.jar:com/hazelcast/jet/impl/util/AsyncSnapshotWriterImpl$SnapshotDataKey.class */
    public static final class SnapshotDataKey implements IdentifiedDataSerializable, PartitionAware {
        private int partitionKey;
        private long snapshotId;
        private String vertexName;
        private int sequence;

        public SnapshotDataKey() {
        }

        public SnapshotDataKey(int i, long j, String str, int i2) {
            this.partitionKey = i;
            this.snapshotId = j;
            this.vertexName = str;
            this.sequence = i2;
        }

        @Override // com.hazelcast.partition.PartitionAware
        public Object getPartitionKey() {
            return Integer.valueOf(this.partitionKey);
        }

        public long snapshotId() {
            return this.snapshotId;
        }

        public String vertexName() {
            return this.vertexName;
        }

        public String toString() {
            int i = this.partitionKey;
            long j = this.snapshotId;
            String str = this.vertexName;
            int i2 = this.sequence;
            return "SnapshotDataKey{partitionKey=" + i + ", snapshotId=" + j + ", vertexName='" + i + "', sequence=" + str + "}";
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getClassId() {
            return 23;
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            objectDataOutput.writeInt(this.partitionKey);
            objectDataOutput.writeLong(this.snapshotId);
            objectDataOutput.writeString(this.vertexName);
            objectDataOutput.writeInt(this.sequence);
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void readData(ObjectDataInput objectDataInput) throws IOException {
            this.partitionKey = objectDataInput.readInt();
            this.snapshotId = objectDataInput.readLong();
            this.vertexName = objectDataInput.readString();
            this.sequence = objectDataInput.readInt();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SnapshotDataKey snapshotDataKey = (SnapshotDataKey) obj;
            return this.partitionKey == snapshotDataKey.partitionKey && this.snapshotId == snapshotDataKey.snapshotId && this.sequence == snapshotDataKey.sequence && Objects.equals(this.vertexName, snapshotDataKey.vertexName);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.partitionKey), Long.valueOf(this.snapshotId), this.vertexName, Integer.valueOf(this.sequence));
        }
    }

    /* loaded from: input_file:lib/hazelcast-5.5.0.jar:com/hazelcast/jet/impl/util/AsyncSnapshotWriterImpl$SnapshotDataValueTerminator.class */
    public static final class SnapshotDataValueTerminator implements IdentifiedDataSerializable {
        public static final IdentifiedDataSerializable INSTANCE = new SnapshotDataValueTerminator();

        private SnapshotDataValueTerminator() {
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getClassId() {
            return 24;
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void writeData(ObjectDataOutput objectDataOutput) {
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void readData(ObjectDataInput objectDataInput) {
        }
    }

    public AsyncSnapshotWriterImpl(NodeEngine nodeEngine, SnapshotContext snapshotContext, String str, int i, int i2, InternalSerializationService internalSerializationService) {
        this(131072, nodeEngine, snapshotContext, str, i, i2, internalSerializationService);
    }

    AsyncSnapshotWriterImpl(int i, NodeEngine nodeEngine, SnapshotContext snapshotContext, String str, int i2, int i3, InternalSerializationService internalSerializationService) {
        this.serializedByteArrayHeader = new byte[12];
        this.firstError = new AtomicReference<>();
        this.numActiveFlushes = new AtomicInteger();
        this.putResponseConsumer = this::consumePutResponse;
        if (Integer.bitCount(i) != 1) {
            throw new IllegalArgumentException("chunkSize must be a power of two, but is " + i);
        }
        this.nodeEngine = nodeEngine;
        this.partitionService = nodeEngine.getPartitionService();
        this.logger = nodeEngine.getLogger(getClass());
        this.snapshotContext = snapshotContext;
        this.vertexName = str;
        this.memberCount = i3;
        this.currentSnapshotId = snapshotContext.currentSnapshotId();
        this.useBigEndian = internalSerializationService.getByteOrder().equals(ByteOrder.BIG_ENDIAN);
        Bits.writeInt(this.serializedByteArrayHeader, 4, -12, true);
        this.buffers = createAndInitBuffers(i, this.partitionService.getPartitionCount(), this.serializedByteArrayHeader);
        JetServiceBackend jetServiceBackend = (JetServiceBackend) nodeEngine.getService(JetServiceBackend.SERVICE_NAME);
        this.partitionKeys = jetServiceBackend.getSharedPartitionKeys();
        this.partitionSequence = i2;
        this.numConcurrentAsyncOps = jetServiceBackend.numConcurrentAsyncOps();
        byte[] byteArray = internalSerializationService.toData(SnapshotDataValueTerminator.INSTANCE).toByteArray();
        this.valueTerminator = Arrays.copyOfRange(byteArray, 4, byteArray.length);
        this.usableChunkCapacity = (i - this.valueTerminator.length) - this.serializedByteArrayHeader.length;
        if (this.usableChunkCapacity <= 0) {
            throw new IllegalArgumentException("too small chunk size: " + i);
        }
    }

    private static CustomByteArrayOutputStream[] createAndInitBuffers(int i, int i2, byte[] bArr) {
        CustomByteArrayOutputStream[] customByteArrayOutputStreamArr = new CustomByteArrayOutputStream[i2];
        for (int i3 = 0; i3 < customByteArrayOutputStreamArr.length; i3++) {
            customByteArrayOutputStreamArr[i3] = new CustomByteArrayOutputStream(i);
            customByteArrayOutputStreamArr[i3].write(bArr, 0, bArr.length);
        }
        return customByteArrayOutputStreamArr;
    }

    private void consumePutResponse(Object obj, Throwable th) {
        try {
        } catch (AssertionError e) {
            th = e;
        }
        if (!$assertionsDisabled && obj != null) {
            throw new AssertionError("put operation overwrote a previous value: " + obj);
        }
        if (th != null) {
            this.logger.severe("Error writing to snapshot map", th);
            this.firstError.compareAndSet(null, th);
        }
        this.numActiveFlushes.decrementAndGet();
        this.numConcurrentAsyncOps.decrementAndGet();
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    @CheckReturnValue
    public boolean offer(Map.Entry<? extends Data, ? extends Data> entry) {
        int partitionId = this.partitionService.getPartitionId(entry.getKey());
        int i = (entry.getKey().totalSize() + entry.getValue().totalSize()) - 8;
        if (i > this.usableChunkCapacity) {
            return putAsyncToMap(partitionId, () -> {
                byte[] bArr = new byte[this.serializedByteArrayHeader.length + i + this.valueTerminator.length];
                this.totalKeys++;
                System.arraycopy(this.serializedByteArrayHeader, 0, bArr, 0, this.serializedByteArrayHeader.length);
                int length = 0 + (this.serializedByteArrayHeader.length - 4);
                Bits.writeInt(bArr, length, i + this.valueTerminator.length, this.useBigEndian);
                int i2 = length + 4;
                copyWithoutHeader((Data) entry.getKey(), bArr, i2);
                int i3 = i2 + (((Data) entry.getKey()).totalSize() - 4);
                copyWithoutHeader((Data) entry.getValue(), bArr, i3);
                System.arraycopy(this.valueTerminator, 0, bArr, i3 + (((Data) entry.getValue()).totalSize() - 4), this.valueTerminator.length);
                return new HeapData(bArr);
            });
        }
        CustomByteArrayOutputStream customByteArrayOutputStream = this.buffers[partitionId];
        if (customByteArrayOutputStream.size() + i + this.valueTerminator.length > customByteArrayOutputStream.capacityLimit && !flushPartition(partitionId)) {
            return false;
        }
        writeWithoutHeader(entry.getKey(), customByteArrayOutputStream);
        writeWithoutHeader(entry.getValue(), customByteArrayOutputStream);
        this.totalKeys++;
        return true;
    }

    private void copyWithoutHeader(Data data, byte[] bArr, int i) {
        byte[] byteArray = data.toByteArray();
        System.arraycopy(byteArray, 4, bArr, i, byteArray.length - 4);
    }

    private void writeWithoutHeader(Data data, OutputStream outputStream) {
        byte[] byteArray = data.toByteArray();
        try {
            outputStream.write(byteArray, 4, byteArray.length - 4);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @CheckReturnValue
    private boolean flushPartition(int i) {
        return containsOnlyHeader(this.buffers[i]) || putAsyncToMap(i, () -> {
            return getBufferContentsAndClear(this.buffers[i]);
        });
    }

    private boolean containsOnlyHeader(CustomByteArrayOutputStream customByteArrayOutputStream) {
        return customByteArrayOutputStream.size() == this.serializedByteArrayHeader.length;
    }

    private Data getBufferContentsAndClear(CustomByteArrayOutputStream customByteArrayOutputStream) {
        customByteArrayOutputStream.write(this.valueTerminator, 0, this.valueTerminator.length);
        byte[] byteArray = customByteArrayOutputStream.toByteArray();
        updateSerializedBytesLength(byteArray);
        customByteArrayOutputStream.reset();
        customByteArrayOutputStream.write(this.serializedByteArrayHeader, 0, this.serializedByteArrayHeader.length);
        return new HeapData(byteArray);
    }

    private void updateSerializedBytesLength(byte[] bArr) {
        Bits.writeInt(bArr, 8, bArr.length - this.serializedByteArrayHeader.length, this.useBigEndian);
    }

    @CheckReturnValue
    private boolean putAsyncToMap(int i, Supplier<Data> supplier) {
        if (!initCurrentMap() || !Util.tryIncrement(this.numConcurrentAsyncOps, 1, 1000)) {
            return false;
        }
        try {
            Data data = supplier.get();
            this.totalPayloadBytes += data.dataSize();
            this.totalChunks++;
            CompletableFuture<Object> completableFuture = this.currentMap.putAsync(new SnapshotDataKey(this.partitionKeys[i], this.currentSnapshotId, this.vertexName, this.partitionSequence), data).toCompletableFuture();
            this.partitionSequence += this.memberCount;
            completableFuture.whenComplete(this.putResponseConsumer);
            this.numActiveFlushes.incrementAndGet();
            return true;
        } catch (HazelcastInstanceNotActiveException e) {
            return false;
        }
    }

    private boolean initCurrentMap() {
        if (this.currentMap != null) {
            return true;
        }
        String currentMapName = this.snapshotContext.currentMapName();
        if (currentMapName == null) {
            return false;
        }
        this.currentMap = JobRepository.safeImap(this.nodeEngine.getHazelcastInstance().getMap(currentMapName));
        this.currentSnapshotId = this.snapshotContext.currentSnapshotId();
        return true;
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    @CheckReturnValue
    public boolean flushAndResetMap() {
        if (!initCurrentMap()) {
            return false;
        }
        for (int i = 0; i < this.buffers.length; i++) {
            if (!flushPartition(i)) {
                return false;
            }
        }
        this.currentMap = null;
        if (!this.logger.isFineEnabled()) {
            return true;
        }
        this.logger.fine(String.format("Stats for %s: keys=%,d, chunks=%,d, bytes=%,d", this.vertexName, Long.valueOf(this.totalKeys), Long.valueOf(this.totalChunks), Long.valueOf(this.totalPayloadBytes)));
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v0, types: [com.hazelcast.jet.impl.util.AsyncSnapshotWriterImpl] */
    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public void resetStats() {
        ?? r3 = 0;
        this.totalPayloadBytes = 0L;
        this.totalChunks = 0L;
        r3.totalKeys = this;
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public boolean hasPendingAsyncOps() {
        return this.numActiveFlushes.get() > 0;
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public Throwable getError() {
        return this.firstError.getAndSet(null);
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public boolean isEmpty() {
        return this.numActiveFlushes.get() == 0 && Arrays.stream(this.buffers).allMatch(this::containsOnlyHeader);
    }

    int partitionKey(int i) {
        return this.partitionKeys[i];
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public long getTotalPayloadBytes() {
        return this.totalPayloadBytes;
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public long getTotalKeys() {
        return this.totalKeys;
    }

    @Override // com.hazelcast.jet.impl.util.AsyncSnapshotWriter
    public long getTotalChunks() {
        return this.totalChunks;
    }

    static {
        $assertionsDisabled = !AsyncSnapshotWriterImpl.class.desiredAssertionStatus();
    }
}
