/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.repositories.s3.async;

import com.jcraft.jzlib.JZlib;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.async.AsyncPartsHandler;
import org.opensearch.repositories.s3.async.TransferSemaphoresHolder;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.io.CheckedContainer;
import org.opensearch.repositories.s3.utils.SseKmsUtil;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.CompletableFutureUtils;

public final class AsyncTransferManager {
    private static final Logger log = LogManager.getLogger(AsyncTransferManager.class);
    private final ExecutorService executorService;
    private final ExecutorService priorityExecutorService;
    private final ExecutorService urgentExecutorService;
    private final long minimumPartSize;
    private final long maxRetryablePartSize;
    private final TransferSemaphoresHolder transferSemaphoresHolder;
    private static final long MAX_UPLOAD_PARTS = 10000L;

    public AsyncTransferManager(long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService, ExecutorService urgentExecutorService, TransferSemaphoresHolder transferSemaphoresHolder) {
        this.executorService = executorService;
        this.priorityExecutorService = priorityExecutorService;
        this.minimumPartSize = minimumPartSize;
        this.maxRetryablePartSize = (long)((double)minimumPartSize + 0.1 * (double)minimumPartSize);
        this.urgentExecutorService = urgentExecutorService;
        this.transferSemaphoresHolder = transferSemaphoresHolder;
    }

    public CompletableFuture<Void> uploadObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, StatsMetricPublisher statsMetricPublisher) {
        CompletableFuture<Void> returnFuture;
        block6: {
            returnFuture = new CompletableFuture<Void>();
            try {
                if (streamContext.getNumberOfParts() == 1) {
                    log.debug(() -> "Starting the upload as a single upload part request");
                    TransferSemaphoresHolder.RequestContext requestContext = this.transferSemaphoresHolder.createRequestContext();
                    Semaphore semaphore = AsyncPartsHandler.maybeAcquireSemaphore(this.transferSemaphoresHolder, requestContext, uploadRequest.getWritePriority(), uploadRequest.getKey());
                    try {
                        this.uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher, semaphore);
                        break block6;
                    }
                    catch (Exception ex) {
                        if (semaphore != null) {
                            semaphore.release();
                        }
                        throw ex;
                    }
                }
                log.debug(() -> "Starting the upload as multipart upload request");
                this.uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher);
            }
            catch (Throwable throwable) {
                returnFuture.completeExceptionally(throwable);
            }
        }
        return returnFuture;
    }

    private void uploadInParts(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> returnFuture, StatsMetricPublisher statsMetricPublisher) {
        String uploadId;
        CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)).expectedBucketOwner(uploadRequest.getExpectedBucketOwner());
        if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
            createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
        }
        if (uploadRequest.doRemoteDataIntegrityCheck()) {
            createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
        }
        SseKmsUtil.configureEncryptionSettings(createMultipartUploadRequestBuilder, uploadRequest);
        CompletableFuture createMultipartUploadFuture = SocketAccess.doPrivileged(() -> s3AsyncClient.createMultipartUpload((CreateMultipartUploadRequest)createMultipartUploadRequestBuilder.build()));
        CompletableFutureUtils.forwardExceptionTo(returnFuture, (CompletableFuture)createMultipartUploadFuture);
        try {
            CreateMultipartUploadResponse createMultipartUploadResponse = (CreateMultipartUploadResponse)createMultipartUploadFuture.get();
            uploadId = createMultipartUploadResponse.uploadId();
            log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
        }
        catch (Exception ex) {
            AsyncTransferManager.handleException(returnFuture, () -> "Failed to initiate multipart upload", ex);
            return;
        }
        this.doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, uploadId, statsMetricPublisher);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doUploadInParts(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> returnFuture, String uploadId, StatsMetricPublisher statsMetricPublisher) {
        List<CompletableFuture<CompletedPart>> futures;
        AtomicReferenceArray<CompletedPart> completedParts = new AtomicReferenceArray<CompletedPart>(streamContext.getNumberOfParts());
        AtomicReferenceArray<CheckedContainer> inputStreamContainers = new AtomicReferenceArray<CheckedContainer>(streamContext.getNumberOfParts());
        try {
            futures = AsyncPartsHandler.uploadParts(s3AsyncClient, this.executorService, this.priorityExecutorService, this.urgentExecutorService, uploadRequest, streamContext, uploadId, completedParts, inputStreamContainers, statsMetricPublisher, uploadRequest.isUploadRetryEnabled(), this.transferSemaphoresHolder, this.maxRetryablePartSize);
        }
        catch (Exception ex) {
            try {
                AsyncPartsHandler.cleanUpParts(s3AsyncClient, uploadRequest, uploadId);
            }
            finally {
                returnFuture.completeExceptionally(ex);
            }
            return;
        }
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)CompletableFutureUtils.allOfExceptionForwarded((CompletableFuture[])((CompletableFuture[])futures.toArray(CompletableFuture[]::new))).thenApply(resp -> {
            try {
                uploadRequest.getUploadFinalizer().accept((Object)true);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return resp;
        })).thenApply(ignore -> {
            if (uploadRequest.doRemoteDataIntegrityCheck()) {
                this.mergeAndVerifyChecksum(inputStreamContainers, uploadRequest.getKey(), uploadRequest.getExpectedChecksum());
            }
            return null;
        })).thenCompose(ignore -> this.completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher))).handle(this.handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))).exceptionally(throwable -> {
            AsyncTransferManager.handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
            return null;
        });
    }

    private void mergeAndVerifyChecksum(AtomicReferenceArray<CheckedContainer> inputStreamContainers, String fileName, long expectedChecksum) {
        long resultantChecksum = AsyncTransferManager.fromBase64String(inputStreamContainers.get(0).getChecksum());
        for (int index = 1; index < inputStreamContainers.length(); ++index) {
            long curChecksum = AsyncTransferManager.fromBase64String(inputStreamContainers.get(index).getChecksum());
            resultantChecksum = JZlib.crc32_combine((long)resultantChecksum, (long)curChecksum, (long)inputStreamContainers.get(index).getContentLength());
        }
        if (resultantChecksum != expectedChecksum) {
            throw new RuntimeException((Throwable)new CorruptFileException("File level checksums didn't match combined part checksums", fileName));
        }
    }

    private BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, CompletableFuture<Void> returnFuture, String uploadId) {
        return (response, throwable) -> {
            if (throwable != null) {
                AsyncPartsHandler.cleanUpParts(s3AsyncClient, uploadRequest, uploadId);
                AsyncTransferManager.handleException(returnFuture, () -> "Failed to send multipart upload requests.", throwable);
            } else {
                returnFuture.complete(null);
            }
            return null;
        };
    }

    private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, String uploadId, AtomicReferenceArray<CompletedPart> completedParts, StatsMetricPublisher statsMetricPublisher) {
        log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", (Object)uploadId));
        CompletedPart[] parts = (CompletedPart[])IntStream.range(0, completedParts.length()).mapToObj(completedParts::get).toArray(CompletedPart[]::new);
        CompleteMultipartUploadRequest completeMultipartUploadRequest = (CompleteMultipartUploadRequest)CompleteMultipartUploadRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).uploadId(uploadId).overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)).multipartUpload((CompletedMultipartUpload)CompletedMultipartUpload.builder().parts(parts).build()).expectedBucketOwner(uploadRequest.getExpectedBucketOwner()).build();
        return SocketAccess.doPrivileged(() -> s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest));
    }

    private static String base64StringFromLong(Long val) {
        return Base64.getEncoder().encodeToString(Arrays.copyOfRange(ByteUtils.toByteArrayBE((long)val), 4, 8));
    }

    private static long fromBase64String(String base64String) {
        byte[] decodedBytes = Base64.getDecoder().decode(base64String);
        if (decodedBytes.length != 4) {
            throw new IllegalArgumentException("Invalid Base64 encoded CRC32 checksum");
        }
        long result = 0L;
        for (int i = 0; i < 4; ++i) {
            result <<= 8;
            result |= (long)(decodedBytes[i] & 0xFF);
        }
        return result;
    }

    private static void handleException(CompletableFuture<Void> returnFuture, Supplier<String> message, Throwable throwable) {
        Throwable cause;
        Throwable throwable2 = cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;
        if (cause instanceof Error) {
            returnFuture.completeExceptionally(cause);
        } else {
            SdkClientException exception = SdkClientException.create((String)message.get(), (Throwable)cause);
            returnFuture.completeExceptionally((Throwable)exception);
        }
    }

    public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority writePriority, boolean uploadRetryEnabled) {
        if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5L)) {
            return contentLengthOfSource;
        }
        if (uploadRetryEnabled && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
            return new ByteSizeValue(5L, ByteSizeUnit.MB).getBytes();
        }
        double optimalPartSize = (double)contentLengthOfSource / 10000.0;
        optimalPartSize = Math.ceil(optimalPartSize);
        return (long)Math.max(optimalPartSize, (double)this.minimumPartSize);
    }

    private void uploadInOneChunk(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> returnFuture, StatsMetricPublisher statsMetricPublisher, Semaphore semaphore) {
        PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).contentLength(Long.valueOf(uploadRequest.getContentLength())).overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher)).expectedBucketOwner(uploadRequest.getExpectedBucketOwner());
        if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
            putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
        }
        if (uploadRequest.doRemoteDataIntegrityCheck()) {
            putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
            putObjectRequestBuilder.checksumCRC32(AsyncTransferManager.base64StringFromLong(uploadRequest.getExpectedChecksum()));
        }
        SseKmsUtil.configureEncryptionSettings(putObjectRequestBuilder, uploadRequest);
        PutObjectRequest putObjectRequest = (PutObjectRequest)putObjectRequestBuilder.build();
        ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.URGENT ? this.urgentExecutorService : (uploadRequest.getWritePriority() == WritePriority.HIGH ? this.priorityExecutorService : this.executorService);
        CompletableFuture putObjectFuture = SocketAccess.doPrivileged(() -> {
            CompletableFuture putObjectRespFuture;
            InputStream inputStream = null;
            try {
                InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
                inputStream = AsyncPartsHandler.maybeRetryInputStream(inputStreamContainer.getInputStream(), uploadRequest.getWritePriority(), uploadRequest.isUploadRetryEnabled(), uploadRequest.getContentLength(), this.maxRetryablePartSize);
                AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream((InputStream)inputStream, (Long)inputStreamContainer.getContentLength(), (ExecutorService)streamReadExecutor);
                putObjectRespFuture = s3AsyncClient.putObject(putObjectRequest, asyncRequestBody);
            }
            catch (Exception e) {
                this.releaseResourcesSafely(semaphore, inputStream, uploadRequest.getKey());
                return CompletableFuture.failedFuture(e);
            }
            InputStream finalInputStream = inputStream;
            return ((CompletableFuture)putObjectRespFuture.handle((resp, throwable) -> {
                this.releaseResourcesSafely(semaphore, finalInputStream, uploadRequest.getKey());
                if (throwable != null) {
                    S3Exception s3Exception;
                    Throwable unwrappedThrowable = ExceptionsHelper.unwrap((Throwable)throwable, (Class[])new Class[]{S3Exception.class});
                    if (unwrappedThrowable != null && (s3Exception = (S3Exception)unwrappedThrowable).statusCode() == 400 && "BadDigest".equals(s3Exception.awsErrorDetails().errorCode())) {
                        throw new RuntimeException((Throwable)new CorruptFileException((Throwable)s3Exception, uploadRequest.getKey()));
                    }
                    returnFuture.completeExceptionally((Throwable)throwable);
                } else {
                    try {
                        uploadRequest.getUploadFinalizer().accept((Object)true);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    returnFuture.complete(null);
                }
                return null;
            })).handle((resp, throwable) -> {
                if (throwable != null) {
                    this.deleteUploadedObject(s3AsyncClient, uploadRequest);
                    returnFuture.completeExceptionally((Throwable)throwable);
                }
                return null;
            });
        });
        CompletableFutureUtils.forwardExceptionTo(returnFuture, (CompletableFuture)putObjectFuture);
        CompletableFutureUtils.forwardResultTo((CompletableFuture)putObjectFuture, returnFuture);
    }

    private void releaseResourcesSafely(Semaphore semaphore, InputStream inputStream, String file) {
        if (semaphore != null) {
            semaphore.release();
        }
        if (inputStream != null) {
            try {
                inputStream.close();
            }
            catch (IOException e) {
                log.error(() -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", (Object)file), (Throwable)e);
            }
        }
    }

    private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest) {
        DeleteObjectRequest deleteObjectRequest = (DeleteObjectRequest)DeleteObjectRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).expectedBucketOwner(uploadRequest.getExpectedBucketOwner()).build();
        SocketAccess.doPrivileged(() -> s3AsyncClient.deleteObject(deleteObjectRequest)).exceptionally(throwable -> {
            log.error(() -> new ParameterizedMessage("Failed to delete uploaded object of key {}", (Object)uploadRequest.getKey()), throwable);
            return null;
        });
    }
}

