FTPからAzureの大きなファイルに大きなファイルを送信する際にマルチスレッドを高速化するにはどうすればよいですか?

Aug 19 2020

現在、FTPからローカルハードディスクにファイルをダウンロードするコードがあります。次に、ファイルをチャンクでAzureにアップロードします。最後に、ローカルおよびftpからファイルを削除します。ただし、このコードは非常に低速です。それを改善する方法を知りたかっただけです。

    private async Task UploadToBlobJobAsync(FtpConfiguration ftpConfiguration, BlobStorageConfiguration blobStorageConfiguration, string fileExtension)
    {
        try
        {
               ftpConfiguration.FileExtension = fileExtension;

                var filesToProcess = FileHelper.GetAllFileNames(ftpConfiguration).ToList();
                
                var batchSize = 4;
                List<Task> uploadBlobToStorageTasks = new List<Task>(batchSize);

                for (int i = 0; i < filesToProcess.Count(); i += batchSize)
                {
                    // calculated the remaining items to avoid an OutOfRangeException
                    batchSize = filesToProcess.Count() - i > batchSize ? batchSize : filesToProcess.Count() - i;

                    for (int j = i; j < i + batchSize; j++)
                    {
                        var fileName = filesToProcess[j];
                        var localFilePath = SaveFileToLocalAndGetLocation(ftpConfiguration, ftpConfiguration.FolderPath, fileName);

                        // Spin off a background task to process the file we just downloaded
                        uploadBlobToStorageTasks.Add(Task.Run(() =>
                        {
                            // Process the file
                            UploadFile(ftpConfiguration, blobStorageConfiguration, fileName, localFilePath).ConfigureAwait(false);
                        }));
                    }

                    Task.WaitAll(uploadBlobToStorageTasks.ToArray());
                    uploadBlobToStorageTasks.Clear();
                }
        }
        catch (Exception ex)
        {
        }
    }

    private async Task UploadFile(FtpConfiguration ftpConfiguration, BlobStorageConfiguration blobStorageConfiguration, string fileName, string localFilePath)
    {
        try
        {
            await UploadLargeFiles(GetBlobStorageConfiguration(blobStorageConfiguration), fileName, localFilePath).ConfigureAwait(false);
    FileHelper.DeleteFile(ftpConfiguration, fileName); // delete file from ftp
        }
        catch (Exception exception)
        {
        }
    }

   private async Task UploadLargeFiles(BlobStorageConfiguration blobStorageConfiguration, string fileName, string localFilePath)
    {
        try
        {
            var output = await UploadFileAsBlockBlob(localFilePath, blobStorageConfiguration).ConfigureAwait(false);

            // delete the file from local
            Logger.LogInformation($"Deleting {fileName} from the local folder. Path is {localFilePath}.");

            if (File.Exists(localFilePath))
            {
                File.Delete(localFilePath);
            }
        }
        catch (Exception ex)
        {
        }
    }

    private async Task UploadFileAsBlockBlob(string sourceFilePath, BlobStorageConfiguration blobStorageConfiguration)
    {
        string fileName = Path.GetFileName(sourceFilePath);
        try
        {
            var storageAccount = CloudStorageAccount.Parse(blobStorageConfiguration.ConnectionString);
            var blobClient = storageAccount.CreateCloudBlobClient();
            var cloudContainer = blobClient.GetContainerReference(blobStorageConfiguration.Container);
            await cloudContainer.CreateIfNotExistsAsync().ConfigureAwait(false);

            var directory = cloudContainer.GetDirectoryReference(blobStorageConfiguration.Path);
            var blob = directory.GetBlockBlobReference(fileName);

            var blocklist = new HashSet<string>();

            byte[] bytes = File.ReadAllBytes(sourceFilePath);

            const long pageSizeInBytes = 10485760 * 20; // 20mb at a time
            long prevLastByte = 0;
            long bytesRemain = bytes.Length;

            do
            {
                long bytesToCopy = Math.Min(bytesRemain, pageSizeInBytes);
                byte[] bytesToSend = new byte[bytesToCopy];

                Array.Copy(bytes, prevLastByte, bytesToSend, 0, bytesToCopy);

                prevLastByte += bytesToCopy;
                bytesRemain -= bytesToCopy;

                // create blockId
                string blockId = Guid.NewGuid().ToString();
                string base64BlockId = Convert.ToBase64String(Encoding.UTF8.GetBytes(blockId));

                await blob.PutBlockAsync(base64BlockId, new MemoryStream(bytesToSend, true), null).ConfigureAwait(false);

                blocklist.Add(base64BlockId);
            }
            while (bytesRemain > 0);

            // post blocklist
            await blob.PutBlockListAsync(blocklist).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
        }
    }

回答

1 Blindy Aug 20 2020 at 00:25

まず、必要のないものはディスクに書き込まないでください。ここであなたの目標が何であるかは完全には明らかではありませんが、そもそもなぜあなたがそのような必要性を持っているのか私にはわかりません。

とは言うものの、送信機能を真空状態にすると、現在の動作は次のようになります。

  1. (あなたが言うように)メモリ内の大きなファイル全体を読む

  2. チャンクごとに、まったく新しい配列を割り当て、チャンクをコピーし、その上に配置しMemoryStreamてから送信します。

それはストリーミングが行われる方法ではありません。

代わりに、何も読み取らずにファイルストリームを開き、必要な数のチャンクをループして、事前に割り当てられた1つのバッファーで各チャンクを個別に読み取り(新しいバイト配列を割り当て続けないでください)、base64表現を取得する必要があります。本当に必要で、チャンクを送信してから、ループを続けます。あなたのガベージコレクターはあなたに感謝します。