Bagaimana cara membuat multithreading dalam mengirim file besar dari FTP ke Azure file besar lebih cepat

Aug 19 2020

Saat ini, saya memiliki kode yang mendownload file dari FTP ke hard disk lokal. Itu kemudian mengunggah file dalam potongan ke Azure. Akhirnya, ini menghapus file dari lokal dan ftp. Kode ini sangat lambat. Hanya ingin tahu bagaimana memperbaikinya.

    private async Task UploadToBlobJobAsync(FtpConfiguration ftpConfiguration, BlobStorageConfiguration blobStorageConfiguration, string fileExtension)
    {
        try
        {
               ftpConfiguration.FileExtension = fileExtension;

                var filesToProcess = FileHelper.GetAllFileNames(ftpConfiguration).ToList();
                
                var batchSize = 4;
                List<Task> uploadBlobToStorageTasks = new List<Task>(batchSize);

                for (int i = 0; i < filesToProcess.Count(); i += batchSize)
                {
                    // calculated the remaining items to avoid an OutOfRangeException
                    batchSize = filesToProcess.Count() - i > batchSize ? batchSize : filesToProcess.Count() - i;

                    for (int j = i; j < i + batchSize; j++)
                    {
                        var fileName = filesToProcess[j];
                        var localFilePath = SaveFileToLocalAndGetLocation(ftpConfiguration, ftpConfiguration.FolderPath, fileName);

                        // Spin off a background task to process the file we just downloaded
                        uploadBlobToStorageTasks.Add(Task.Run(() =>
                        {
                            // Process the file
                            UploadFile(ftpConfiguration, blobStorageConfiguration, fileName, localFilePath).ConfigureAwait(false);
                        }));
                    }

                    Task.WaitAll(uploadBlobToStorageTasks.ToArray());
                    uploadBlobToStorageTasks.Clear();
                }
        }
        catch (Exception ex)
        {
        }
    }

    private async Task UploadFile(FtpConfiguration ftpConfiguration, BlobStorageConfiguration blobStorageConfiguration, string fileName, string localFilePath)
    {
        try
        {
            await UploadLargeFiles(GetBlobStorageConfiguration(blobStorageConfiguration), fileName, localFilePath).ConfigureAwait(false);
    FileHelper.DeleteFile(ftpConfiguration, fileName); // delete file from ftp
        }
        catch (Exception exception)
        {
        }
    }

   private async Task UploadLargeFiles(BlobStorageConfiguration blobStorageConfiguration, string fileName, string localFilePath)
    {
        try
        {
            var output = await UploadFileAsBlockBlob(localFilePath, blobStorageConfiguration).ConfigureAwait(false);

            // delete the file from local
            Logger.LogInformation($"Deleting {fileName} from the local folder. Path is {localFilePath}.");

            if (File.Exists(localFilePath))
            {
                File.Delete(localFilePath);
            }
        }
        catch (Exception ex)
        {
        }
    }

    private async Task UploadFileAsBlockBlob(string sourceFilePath, BlobStorageConfiguration blobStorageConfiguration)
    {
        string fileName = Path.GetFileName(sourceFilePath);
        try
        {
            var storageAccount = CloudStorageAccount.Parse(blobStorageConfiguration.ConnectionString);
            var blobClient = storageAccount.CreateCloudBlobClient();
            var cloudContainer = blobClient.GetContainerReference(blobStorageConfiguration.Container);
            await cloudContainer.CreateIfNotExistsAsync().ConfigureAwait(false);

            var directory = cloudContainer.GetDirectoryReference(blobStorageConfiguration.Path);
            var blob = directory.GetBlockBlobReference(fileName);

            var blocklist = new HashSet<string>();

            byte[] bytes = File.ReadAllBytes(sourceFilePath);

            const long pageSizeInBytes = 10485760 * 20; // 20mb at a time
            long prevLastByte = 0;
            long bytesRemain = bytes.Length;

            do
            {
                long bytesToCopy = Math.Min(bytesRemain, pageSizeInBytes);
                byte[] bytesToSend = new byte[bytesToCopy];

                Array.Copy(bytes, prevLastByte, bytesToSend, 0, bytesToCopy);

                prevLastByte += bytesToCopy;
                bytesRemain -= bytesToCopy;

                // create blockId
                string blockId = Guid.NewGuid().ToString();
                string base64BlockId = Convert.ToBase64String(Encoding.UTF8.GetBytes(blockId));

                await blob.PutBlockAsync(base64BlockId, new MemoryStream(bytesToSend, true), null).ConfigureAwait(false);

                blocklist.Add(base64BlockId);
            }
            while (bytesRemain > 0);

            // post blocklist
            await blob.PutBlockListAsync(blocklist).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
        }
    }

Jawaban

1 Blindy Aug 20 2020 at 00:25

Pertama, jangan menulis ke disk apa pun yang tidak Anda perlukan. Tidak sepenuhnya jelas apa tujuan Anda di sini, tetapi saya gagal memahami mengapa Anda memiliki kebutuhan seperti itu sejak awal.

Yang mengatakan, jika Anda mengambil fungsi kirim Anda dalam ruang hampa, yang dilakukannya sekarang adalah:

  1. Baca seluruh (seperti yang Anda katakan) file besar Anda di memori

  2. Untuk setiap potongan, Anda mengalokasikan array baru, menyalin potongan, meletakkan MemoryStreamdi atasnya dan kemudian mengirimkannya.

Itu bukanlah cara streaming dilakukan.

Sebagai gantinya, Anda harus membuka aliran file tanpa membaca apa pun, lalu mengulanginya untuk potongan sebanyak yang Anda butuhkan dan membaca setiap potongan secara individual dalam satu buffer yang dialokasikan sebelumnya (jangan terus mengalokasikan array byte baru), dapatkan representasi base64 jika Anda benar-benar membutuhkannya, dan mengirimkan potongannya, lalu terus mengulang. Pemulung sampah Anda akan berterima kasih.