使用Blazor WASM实现可取消的多文件带校验并发分片上传

使用,blazor,wasm,实现,取消,文件,校验,并发,分片,上传 · 浏览次数 : 39

小编点评

**文件分片上传示例** **代码描述** 该示例使用Blazor和C#实现并发分片上传,文件分片大小由方法参数控制。 **主要代码** ```csharp public enum FileUploadResult { Fail, Canceled, Success } public async Task SliceFileAsync(IBrowserFile file, int sliceSize, long fileOffset) { // 切分文件并返回片段数据 return await file.ReadAsync(sliceSize, cancellation); } ``` **使用方法** 1. 使用Blazor的`File`组件选择要分片的文件。 2. 在方法中设置分片大小和文件偏移。 3. 使用`SliceFileAsync`方法异步获取每个片段的上传结果。 4. 将所有片段的上传结果组合成一个完成上传请求并发送通知服务端。 **示例** ```html
选择要分片的文件:
分片大小: 文件偏移:
已完成的分片:
``` **输出** ``` 选择要分片的文件: 分片大小: 文件偏移: 开始分片 已完成的分片:
``` **注释** *示例使用Blazor的`File`组件选择要分片的文件。 *示例使用`SliceFileAsync`方法异步获取每个片段的上传结果。 *示例将所有片段的上传结果组合成一个完成上传请求并发送通知服务端。 *示例输出已完成的分片文件的信息。

正文

前言

上传大文件时,原始HTTP文件上传功能可能会影响使用体验,此时使用分片上传功能可以有效避免原始上传的弊端。由于分片上传不是HTTP标准的一部分,所以只能自行开发相互配合的服务端和客户端。文件分片上传在许多情况时都拥有很多好处,除非已知需要上传的文件一定非常小。分片上传可以对上传的文件进行快速分片校验,避免大文件上传时长时间等待校验,当然完整校验可以在秒传时使用,有这种需求的情况就只能老实等待校验了。

Blazr WASM提供了在 .NET环境中使用浏览器功能的能力,充分利用C#和 .NET能够大幅简化分片上传功能的开发。本次示例使用HTTP标准上传作为分片上传的底层基础,并提供分片校验功能保障上传数据的完整性。

新书宣传

有关新书的更多介绍欢迎查看《C#与.NET6 开发从入门到实践》上市,作者亲自来打广告了!
image

正文

本示例的Blazor代码位于默认ASP.NET Core托管的Blazor WASM应用模板的Index页面。

在Shared项目添加公共数据模型

/// <summary>
/// 文件分片上传输入模型
/// </summary>
public class FileChunkUploadInput
{
    /// <summary>
    /// 上传任务代码
    /// </summary>
    public string? UploadTaskCode { get; set; }

    /// <summary>
    /// 上传请求类型
    /// </summary>
    public string UploadType { get; set; } = null!;

    /// <summary>
    /// 文件名
    /// </summary>
    public string FileName { get; set; } = null!;

    /// <summary>
    /// 文件大小
    /// </summary>
    public long? FileSize { get; set; }

    /// <summary>
    /// 支持的Hash算法,优选算法请靠前
    /// </summary>
    public List<string>? AllowedHashAlgorithm { get; set; }

    /// <summary>
    /// 使用的Hash算法
    /// </summary>
    public string? HashAlgorithm { get; set; }

    /// <summary>
    /// Hash值
    /// </summary>
    public string? HashValue { get; set; }

    /// <summary>
    /// 文件分片数量
    /// </summary>
    public int FileChunkCount { get; set; }

    /// <summary>
    /// 文件片段大小
    /// </summary>
    public int? FileChunkSize { get; set; }

    /// <summary>
    /// 文件片段偏移量(相对于整个文件)
    /// </summary>
    public long? FileChunkOffset { get; set; }

    /// <summary>
    /// 文件片段索引
    /// </summary>
    public int? FileChunkIndex { get; set; }

    /// <summary>
    /// 取消上传的原因
    /// </summary>
    public string? CancelReason { get; set; }
}

/// <summary>
/// 文件分片上传开始结果
/// </summary>
public class FileChunkUploadStartReault
{
    /// <summary>
    /// 上传任务代码
    /// </summary>
    public string UploadTaskCode { get; set; } = null!;

    /// <summary>
    /// 选中的Hash算法
    /// </summary>
    public string SelectedHashAlgorithm { get; set; } = null!;
}

/// <summary>
/// Hash助手
/// </summary>
public static class HashHelper
{
    /// <summary>
    /// 把Hash的字节数组转换为16进制字符串表示
    /// </summary>
    /// <param name="bytes">原始Hash值</param>
    /// <returns>Hash值的16进制文本表示(大写)</returns>
    public static string ToHexString(this byte[] bytes)
    {
        StringBuilder sb = new(bytes.Length * 2);
        foreach (var @byte in bytes)
        {
            sb.Append(@byte.ToString("X2"));
        }
        return sb.ToString();
    }
}

服务端控制器

[ApiController]
[Route("[controller]")]
public class UploadController : ControllerBase
{
    /// <summary>
    /// 支持的Hash算法,优选算法请靠前
    /// </summary>
    private static string[] supportedHashAlgorithm = new[] { "MD5", "SHA1", "SHA256" };

    /// <summary>
    /// 文件写入锁的线程安全字典,每个上传任务对应一把锁
    /// </summary>
    private static readonly ConcurrentDictionary<string, AsyncLock> fileWriteLockerDict = new();

    private readonly ILogger<UploadController> _logger;
    private readonly IWebHostEnvironment _env;

    public UploadController(ILogger<UploadController> logger, IWebHostEnvironment env)
    {
        _logger = logger;
        _env = env;
    }

    /// <summary>
    /// 分片上传动作
    /// </summary>
    /// <param name="input">上传表单</param>
    /// <param name="fileChunkData">文件片段数据</param>
    /// <param name="requestAborted">请求取消令牌</param>
    /// <returns>片段上传结果</returns>
    [HttpPost, RequestSizeLimit(1024 * 1024 * 11)]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status400BadRequest)]
    [ProducesDefaultResponseType]
    public async Task<IActionResult> Upload(
        [FromForm]FileChunkUploadInput input,
        [FromForm]IFormFile? fileChunkData,
        CancellationToken requestAborted)
    {
        switch (input.UploadType)
        {
            // 请求开始一个新的上传任务,协商上传参数
            case "startUpload":
                {
                    //var trustedFileNameForDisplay =
                    //    WebUtility.HtmlEncode(fileChunkData?.FileName ?? input.FileName);

                    // 选择双方都支持的优选Hash算法
                    var selectedHashAlgorithm = supportedHashAlgorithm
                        .Intersect(input.AllowedHashAlgorithm ?? Enumerable.Empty<string>())
                        .FirstOrDefault();

                    // 验证必要的表单数据
                    if (selectedHashAlgorithm is null or "")
                    {
                        ModelState.AddModelError<FileChunkUploadInput>(x => x.AllowedHashAlgorithm, "can not select hash algorithm");
                    }

                    if (input.FileSize is null)
                    {
                        ModelState.AddModelError<FileChunkUploadInput>(x => x.FileSize, "must have value for start、upload and complete");
                    }

                    if (ModelState.ErrorCount > 0)
                    {
                        return ValidationProblem(ModelState);
                    }

                    // 使用随机文件名提高安全性,并把文件名作为任务代码使用
                    var trustedFileNameForFileStorage = Path.GetRandomFileName();

                    var savePath = Path.Combine(
                        _env.ContentRootPath,
                        _env.EnvironmentName,
                        "unsafe_uploads",
                        trustedFileNameForFileStorage);

                    var savePathWithFile = Path.Combine(
                        savePath,
                        $"{input.FileName}.tmp");

                    if (!Directory.Exists(savePath))
                    {
                        Directory.CreateDirectory(savePath);
                    }

                    // 根据表单创建对应大小的文件
                    await using (var fs = new FileStream(savePathWithFile, FileMode.Create))
                    {
                        fs.SetLength(input.FileSize!.Value);
                        await fs.FlushAsync();
                    }

                    // 设置锁
                    fileWriteLockerDict.TryAdd(trustedFileNameForFileStorage, new());

                    // 返回协商结果
                    return Ok(new FileChunkUploadStartReault
                    {
                        UploadTaskCode = trustedFileNameForFileStorage,
                        SelectedHashAlgorithm = selectedHashAlgorithm!
                    });
                }

            // 上传文件片段
            case "uploadChunk":
                // 验证表单
                if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _))
                {
                    ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists");
                    return ValidationProblem(ModelState);
                }

                // 使用内存池缓冲数据,注意使用using释放内存
                using (var pooledMemory = MemoryPool<byte>.Shared.Rent((int)fileChunkData!.Length))
                {
                    // 使用切片语法获取精准大小的内存缓冲区装载上传的数据
                    var buffer = pooledMemory.Memory[..(int)fileChunkData!.Length];
                    var readBytes = await fileChunkData.OpenReadStream().ReadAsync(buffer, requestAborted);
                    var readBuffer = buffer[..readBytes];

                    Debug.Assert(readBytes == fileChunkData!.Length);

                    // 校验Hash
                    var hash = input.HashAlgorithm switch
                    {
                        "SHA1" => SHA1.HashData(readBuffer.Span),
                        "SHA256" => SHA256.HashData(readBuffer.Span),
                        "MD5" => MD5.HashData(readBuffer.Span),
                        _ => Array.Empty<byte>()
                    };

                    if (hash.ToHexString() != input.HashValue)
                    {
                        ModelState.AddModelError<FileChunkUploadInput>(x => x.HashValue, "hash does not match");
                        return ValidationProblem(ModelState);
                    }

                    var savePath = Path.Combine(
                        _env.ContentRootPath,
                        _env.EnvironmentName,
                        "unsafe_uploads",
                        input.UploadTaskCode!);

                    var savePathWithFile = Path.Combine(
                        savePath,
                        $"{input.FileName}.tmp");

                    // 使用锁写入数据,文件流不支持写共享,必须串行化
                    if(fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var locker))
                    {
                        using (await locker.LockAsync())
                        {
                            await using (var fs = new FileStream(savePathWithFile, FileMode.Open, FileAccess.Write))
                            {
                                // 定位文件流
                                fs.Seek(input.FileChunkOffset!.Value, SeekOrigin.Begin);

                                await fs.WriteAsync(readBuffer, requestAborted);
                                await fs.FlushAsync();
                            }
                        }
                    }
                }

                return Ok();

            // 取消上传
            case "cancelUpload":
                // 验证表单
                if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _))
                {
                    ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists");
                    return ValidationProblem(ModelState);
                }

                {
                    var deletePath = Path.Combine(
                        _env.ContentRootPath,
                        _env.EnvironmentName,
                        "unsafe_uploads",
                        input.UploadTaskCode!);

                    // 删除文件,清除锁
                    if (fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var locker))
                    {
                        using (await locker.LockAsync())
                        {
                            if (Directory.Exists(deletePath))
                            {
                                var dir = new DirectoryInfo(deletePath);
                                dir.Delete(true);
                            }

                            fileWriteLockerDict.TryRemove(input.UploadTaskCode!, out _);
                        }
                    }
                }

                return Ok();

            // 完成上传
            case "completeUpload":
                // 验证表单
                if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _))
                {
                    ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists");
                    return ValidationProblem(ModelState);
                }

                {
                    var savePath = Path.Combine(
                        _env.ContentRootPath,
                        _env.EnvironmentName,
                        "unsafe_uploads",
                        input.UploadTaskCode!);

                    // 去除文件的临时扩展名,清除锁
                    var savePathWithFile = Path.Combine(savePath, $"{input.FileName}.tmp");

                    var fi = new FileInfo(savePathWithFile);
                    fi.MoveTo(Path.Combine(savePath, input.FileName));

                    fileWriteLockerDict.TryRemove(input.UploadTaskCode!, out _);
                }

                return Ok();
            default:
                return BadRequest();
        }
    }
}

服务端使用三段式上传模式,开始上传,上传数据,完成(取消)上传。开始上传负责协商Hash算法和分配任务代码;上传数据负责具体的传输,并通过表单提供附加信息方便服务端操作。完成上传负责善后和资源清理。其中文件写入的异步锁使用Nito.AsyncEx代替不支持在异步中使用的lock语句。

页面代码(Index.razor),在结尾追加

<p>支持随时取消的多文件并行分片上传,示例同时上传2个文件,每个文件同时上传2个分片,合计同时上传4个分片</p>
<InputFile OnChange="UploadFile" multiple></InputFile>
<button @onclick="async (MouseEventArgs e) => uploadCancelSource?.Cancel()">取消上传</button>

@code{
    [Inject] private HttpClient _http { get; init; } = null!;
    [Inject] private ILogger<Index> _logger { get; init; } = null!;

    private CancellationTokenSource? uploadCancelSource;

    /// <summary>
    /// 上传文件
    /// </summary>
    /// <param name="args">上传文件的事件参数</param>
    /// <returns></returns>
    private async Task UploadFile(InputFileChangeEventArgs args)
    {
        // 设置文件并发选项
        var parallelCts = new CancellationTokenSource();
        uploadCancelSource = parallelCts;
        var parallelOption = new ParallelOptions
        {
            MaxDegreeOfParallelism = 2,
            CancellationToken = parallelCts.Token
        };

        // 并发上传所有文件
        await Parallel.ForEachAsync(
            args.GetMultipleFiles(int.MaxValue),
            parallelOption,
            async (file, cancellation) =>
            {
                // 这里的取消令牌是并发方法创建的,和并发选项里的令牌不是一个
                if (cancellation.IsCancellationRequested)
                {
                    parallelCts.Cancel();
                    return;
                }

                // 使用链接令牌确保外部取消能传递到内部
                var chunkUploadResult = await UploadChunkedFile(
                    file,
                    CancellationTokenSource.CreateLinkedTokenSource(
                        parallelCts.Token,
                        cancellation
                    ).Token
                );

                // 如果上传不成功则取消后续上传
                if (chunkUploadResult != FileUploadResult.Success)
                {
                    parallelCts.Cancel();
                    return;
                }
            }
        );
    }

    /// <summary>
    /// 分片上传文件
    /// </summary>
    /// <param name="file">要上传的文件</param>
    /// <param name="cancellation">取消令牌</param>
    /// <returns>上传结果</returns>
    private async Task<FileUploadResult> UploadChunkedFile(IBrowserFile file, CancellationToken cancellation = default)
    {
        if (cancellation.IsCancellationRequested) return FileUploadResult.Canceled;

        _logger.LogInformation("开始上传文件:{0}", file.Name);

        // 计算分片大小,文件小于10MB分片1MB,大于100MB分片10MB,在其间则使用不超过10片时的所需大小
        var coefficient = file.Size switch
        {
            <= 1024 * 1024 * 10 => 1,
            > 1024 * 1024 * 10 and <= 1024 * 1024 *100 => (int)Math.Ceiling(file.Size / (1024.0 * 1024) / 10),
            _ => 10
        };

        // 初始化分片参数,准备字符串格式的数据供表单使用
        var bufferSize = 1024 * 1024 * coefficient; // MB
        var stringBufferSize = bufferSize.ToString();
        var chunkCount = (int)Math.Ceiling(file.Size / (double)bufferSize);
        var stringChunkCount = chunkCount.ToString();
        var stringFileSize = file.Size.ToString();

        // 发起分片上传,协商Hash算法,获取任务代码
        var uploadStartContent = new List<KeyValuePair<string, string>>
        {
            new("uploadType", "startUpload"),
            new("fileName", file.Name),
            new("fileSize", stringFileSize),
            new("allowedHashAlgorithm", "SHA1"),
            new("allowedHashAlgorithm", "SHA256"),
            new("fileChunkCount", stringChunkCount),
            new("fileChunkSize", stringBufferSize),
        };

        var uploadStartForm = new FormUrlEncodedContent(uploadStartContent);

        HttpResponseMessage? uploadStartResponse = null;
        try
        {
            uploadStartResponse = await _http.PostAsync("/upload", uploadStartForm, cancellation);
        }
        catch(TaskCanceledException e)
        {
            _logger.LogWarning(e, "外部取消上传,已停止文件:{0} 的上传", file.Name);
            return FileUploadResult.Canceled;
        }
        catch(Exception e)
        {
            _logger.LogError(e, "文件:{0} 的上传参数协商失败", file.Name);
            return FileUploadResult.Fail;
        }

        // 如果服务器响应失败,结束上传
        if (uploadStartResponse?.IsSuccessStatusCode is null or false)
        {
            _logger.LogError("文件:{0} 的上传参数协商失败", file.Name);
            return FileUploadResult.Fail;
        }

        // 解析协商的参数
        var uploadStartReault = await uploadStartResponse.Content.ReadFromJsonAsync<FileChunkUploadStartReault>();
        var uploadTaskCode = uploadStartReault!.UploadTaskCode;
        var selectedHashAlgorithm = uploadStartReault!.SelectedHashAlgorithm;

        _logger.LogInformation("文件:{0} 的上传参数协商成功", file.Name);

        // 设置分片并发选项
        var parallelOption = new ParallelOptions
        {
            MaxDegreeOfParallelism = 2,
        };

        var fileUploadCancelSource = new CancellationTokenSource();
        var sliceEnumeratorCancelSource = CancellationTokenSource
            .CreateLinkedTokenSource(
                cancellation,
                fileUploadCancelSource.Token
            );
        // 各个分片的上传结果
        var sliceUploadResults = new FileUploadResult?[chunkCount];
        // 并发上传各个分片,并发循环本身不能用并发选项的取消令牌取消,可能会导致内存泄漏,应该通过切片循环的取消使并发循环因没有可用元素自然结束
        await Parallel.ForEachAsync(
            SliceFileAsync(
                file,
                bufferSize,
                sliceEnumeratorCancelSource.Token
            ),
            parallelOption,
            async (fileSlice, sliceUploadCancel) =>
            {
                // 解构参数
                var (memory, sliceIndex, readBytes, fileOffset) = fileSlice;

                // 使用using确保结束后把租用的内存归还给内存池
                using (memory)
                {
                    var stringSliceIndex = sliceIndex.ToString();

                    // 主动取消上传,发送取消请求,通知服务端清理资源
                    if (sliceUploadCancel.IsCancellationRequested)
                    {
                        _logger.LogWarning("外部取消上传,已停止文件:{0} 的上传", file.Name);

                        fileUploadCancelSource.Cancel();
                        sliceUploadResults[sliceIndex] = FileUploadResult.Canceled;

                        var uploadCancelContent = new Dictionary<string, string>()
                        {
                            {"uploadType", "cancelUpload"},
                            {"uploadTaskCode", uploadTaskCode!},
                            {"fileName", file.Name},
                            {"hashAlgorithm", selectedHashAlgorithm},
                            {"fileChunkCount", stringChunkCount},
                            {"fileChunkIndex", stringSliceIndex},
                            {"cancelReason", "调用方要求取消上传。"},
                        };
                        var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
                        var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);

                        return;
                    }

                    // 当前上传分片索引应当小于预计的分片数
                    Debug.Assert(sliceIndex < chunkCount);

                    // 获取准确大小的缓冲区,从内存池租用时得到的容量可能大于申请的大小,使用C#的新集合切片语法
                    var readBuffer = memory.Memory[..readBytes];

                    var sw = Stopwatch.StartNew();
                    // 根据协商的算法计算Hash,wasm环境不支持MD5和全部非对称加密算法
                    var hash = selectedHashAlgorithm switch
                    {
                        "SHA1" => SHA1.HashData(readBuffer.Span),
                        "SHA256" => SHA256.HashData(readBuffer.Span),
                        _ => Array.Empty<byte>()
                    };
                    sw.Stop();

                    _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 计算Hash用时 {3}", file.Name, sliceIndex, readBytes, sw.Elapsed);

                    var stringReadBytes = readBytes.ToString();
                    var stringFileOffset = fileOffset.ToString();

                    // 上传当前分片
                    MultipartFormDataContent uploadFileForm = new();
                    uploadFileForm.Add(new StringContent(uploadTaskCode!), "uploadTaskCode");
                    uploadFileForm.Add(new StringContent("uploadChunk"), "uploadType");
                    uploadFileForm.Add(new StringContent(file.Name), "fileName");
                    uploadFileForm.Add(new StringContent(stringFileSize), "fileSize");
                    uploadFileForm.Add(new StringContent(selectedHashAlgorithm!), "hashAlgorithm");
                    uploadFileForm.Add(new StringContent(hash.ToHexString()), "hashValue");
                    uploadFileForm.Add(new StringContent(stringChunkCount), "fileChunkCount");
                    uploadFileForm.Add(new StringContent(stringReadBytes), "fileChunkSize");
                    uploadFileForm.Add(new StringContent(stringFileOffset), "fileChunkOffset");
                    uploadFileForm.Add(new StringContent(stringSliceIndex), "fileChunkIndex");

                    // 如果是未知的文件类型,设置为普通二进制流的MIME类型
                    var fileChunk = new ReadOnlyMemoryContent(readBuffer);
                    fileChunk.Headers.ContentType = new MediaTypeHeaderValue(string.IsNullOrEmpty(file.ContentType) ? "application/octet-stream" : file.ContentType);
                    uploadFileForm.Add(fileChunk, "fileChunkData", file.Name);

                    HttpResponseMessage? uploadResponse = null;
                    try
                    {
                        var uploadTaskCancel = CancellationTokenSource
                            .CreateLinkedTokenSource(
                                sliceUploadCancel,
                                sliceEnumeratorCancelSource.Token
                            );

                        _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 开始上传", file.Name, sliceIndex, readBytes);

                        sw.Restart();
                        uploadResponse = await _http.PostAsync("/upload", uploadFileForm, uploadTaskCancel.Token);
                    }
                    catch (TaskCanceledException e)
                    {
                        _logger.LogWarning(e, "外部取消上传,已停止文件:{0} 的上传", file.Name);

                        fileUploadCancelSource.Cancel();
                        sliceUploadResults[sliceIndex] = FileUploadResult.Canceled;

                        var uploadCancelContent = new Dictionary<string, string>()
                        {
                            {"uploadType", "cancelUpload"},
                            {"uploadTaskCode", uploadTaskCode!},
                            {"fileName", file.Name},
                            {"hashAlgorithm", selectedHashAlgorithm},
                            {"fileChunkCount", stringChunkCount},
                            {"fileChunkIndex", stringSliceIndex},
                            {"cancelReason", "调用方要求取消上传。"},
                        };
                        var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
                        var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);

                        return;
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, "上传发生错误,已停止文件:{0} 的上传", file.Name);

                        fileUploadCancelSource.Cancel();
                        sliceUploadResults[sliceIndex] = FileUploadResult.Fail;

                        var uploadCancelContent = new Dictionary<string, string>()
                        {
                            {"uploadType", "cancelUpload"},
                            {"uploadTaskCode", uploadTaskCode!},
                            {"fileName", file.Name},
                            {"hashAlgorithm", selectedHashAlgorithm},
                            {"fileChunkCount", stringChunkCount},
                            {"fileChunkSize", stringReadBytes},
                            {"fileChunkOffset", stringFileOffset},
                            {"fileChunkIndex", stringSliceIndex},
                            {"cancelReason", "上传过程中发生错误。"},
                        };
                        var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
                        var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);

                        return;
                    }
                    finally
                    {
                        sw.Stop();
                    }

                    // 上传发生错误,发送取消请求,通知服务端清理资源
                    if (uploadResponse?.IsSuccessStatusCode is null or false)
                    {
                        _logger.LogError("上传发生错误,已停止文件:{0} 的上传", file.Name);

                        fileUploadCancelSource.Cancel();
                        sliceUploadResults[sliceIndex] = FileUploadResult.Fail;

                        var uploadCancelContent = new Dictionary<string, string>()
                        {
                            {"uploadType", "cancelUpload"},
                            {"uploadTaskCode", uploadTaskCode!},
                            {"fileName", file.Name},
                            {"hashAlgorithm", selectedHashAlgorithm},
                            {"fileChunkCount", stringChunkCount},
                            {"fileChunkSize", stringReadBytes},
                            {"fileChunkOffset", stringFileOffset},
                            {"fileChunkIndex", stringSliceIndex},
                            {"cancelReason", "上传过程中发生错误。"},
                        };
                        var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
                        var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);

                        return;
                    }

                    _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 上传成功,用时 {3}", file.Name, sliceIndex, readBytes, sw.Elapsed);

                    sliceUploadResults[sliceIndex] = FileUploadResult.Success;
                }
            }
        );

        // 如果所有分片都上传成功,则发送完成请求完成上传
        if (sliceUploadResults.All(success => success is FileUploadResult.Success))
        {
            var uploadCompleteContent = new Dictionary<string, string>()
            {
                {"uploadType", "completeUpload"},
                {"uploadTaskCode", uploadTaskCode!},
                {"fileName", file.Name},
                {"fileSize", stringFileSize},
                {"hashAlgorithm", selectedHashAlgorithm},
                {"fileChunkCount", stringChunkCount},
                {"fileChunkSize", stringBufferSize},
            };
            var uploadCompleteForm = new FormUrlEncodedContent(uploadCompleteContent);
            var uploadCompleteResponse = await _http.PostAsync("/upload", uploadCompleteForm);

            if (uploadCompleteResponse.IsSuccessStatusCode)
            {
                _logger.LogInformation("文件:{0} 上传成功,共 {1} 个片段", file.Name, chunkCount);
                return FileUploadResult.Success;
            }
            else
            {
                _logger.LogError("上传发生错误,已停止文件:{0} 的上传", file.Name);

                var uploadCancelContent = new Dictionary<string, string>()
                {
                    {"uploadType", "cancelUpload"},
                    {"uploadTaskCode", uploadTaskCode!},
                    {"fileName", file.Name},
                    {"hashAlgorithm", selectedHashAlgorithm},
                    {"fileChunkCount", stringChunkCount},
                    {"cancelReason", "上传过程中发生错误。"},
                };
                var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
                var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);

                return FileUploadResult.Fail;
            }
        }
        else if (sliceUploadResults.Any(success => success is FileUploadResult.Fail))
        {
            return FileUploadResult.Fail;
        }
        else
        {
            return FileUploadResult.Canceled;
        }
    }

    /// <summary>
    /// 异步切分要上传的文件
    /// <br/>如果想中途结束切分,不要在调用此方法的foreach块中使用break,请使用取消令牌,否则会出现内存泄漏
    /// </summary>
    /// <param name="file">要分片的文件</param>
    /// <param name="sliceSize">分片大小</param>
    /// <param name="cancellation">取消令牌</param>
    /// <returns>已切分的文件片段数据,用完切记释放其中的内存缓冲</returns>
    private static async IAsyncEnumerable<(IMemoryOwner<byte> memory, int sliceIndex, int readBytes, long fileOffset)> SliceFileAsync(
        IBrowserFile file,
        int sliceSize,
        [EnumeratorCancellation] CancellationToken cancellation = default)
    {
        if (cancellation.IsCancellationRequested) yield break;

        int fileSliceIndex;
        long fileOffset;
        IMemoryOwner<byte> memory;
        await using var fileStream = file.OpenReadStream(long.MaxValue);

        for (fileSliceIndex = 0, fileOffset = 0, memory = MemoryPool<byte>.Shared.Rent(sliceSize);
            (await fileStream.ReadAsync(memory.Memory[..sliceSize], cancellation)) is int readBytes and > 0;
            fileSliceIndex++, fileOffset += readBytes, memory = MemoryPool<byte>.Shared.Rent(sliceSize)
        )
        {
            if(cancellation.IsCancellationRequested)
            {
                // 如果取消切分,缓冲不会返回到外部,只能在内部释放
                memory.Dispose();
                yield break;
            }
            yield return (memory, fileSliceIndex, readBytes, fileOffset);
        }
        // 切分结束后会多出一个没用的缓冲,只能在内部释放
        memory.Dispose();
    }

    /// <summary>
    /// 上传结果
    /// </summary>
    public enum FileUploadResult
    {
        /// <summary>
        /// 失败
        /// </summary>
        Fail = -2,

        /// <summary>
        /// 取消
        /// </summary>
        Canceled = -1,

        /// <summary>
        /// 没有结果,未知结果
        /// </summary>
        None = 0,

        /// <summary>
        /// 成功
        /// </summary>
        Success = 1
    }
}

示例使用Parallel.ForEachAsync方法并行启动多个文件和每个文件的多个片段的上传,并发量由方法的参数控制。UploadChunkedFile方法负责单个文件的上传,其中的IBrowserFile类型是.NET 6新增的文件选择框选中项的包装,可以使用其中的OpenReadStream方法流式读取文件数据,确保大文件上传不会在内存中缓冲所有数据导致内存占用问题。

UploadChunkedFile方法内部使用自适应分片大小算法,规则为片段最小1MB,最大10MB,尽可能平均分为10份。得出片段大小后向服务端请求开始上传文件,服务端成功返回后开始文件切分、校验和上传。

SliceFileAsync负责切分文件并流式返回每个片段,切分方法是惰性的,所以不用担心占用大量内存,但是这个方法只能使用取消令牌中断切分,如果在调用该方法的await foreach块中使用break中断会产生内存泄漏。切分完成后会返回包含片段数据的内存缓冲和其他附加信息。OpenReadStream需要使用参数控制允许读取的最大字节数(默认512KB),因为这里是分片上传,直接设置为long.MaxValue即可。for循环头使用逗号表达式定义多个循环操作,使循环体的代码清晰简洁。

UploadChunkedFile方法使用Parallel.ForEachAsync并行启动多个片段的校验和上传,WASM中不支持MD5和所有非对称加密算法,需要注意。完成文件的并行上传或发生错误后会检查所有片段的上传情况,如果所有片段都上传成功,就发送完成上传请求通知服务端收尾善后,否则删除临时文件。

结语

这应该是一个比较清晰易懂的分片上传示例。示例使用Blazor 和C#以非常流畅的异步代码实现了并发分片上传。但是本示例依然有许多可优化的点,例如实现断点续传,服务端如果没有收到结束请求时的兜底处理等,这些就留给朋友们思考了。

又是很久没有写文章了,一直没有找到什么好选题,难得找到一个,经过将近1周的研究开发终于搞定了。

QQ群

读者交流QQ群:540719365
image

欢迎读者和广大朋友一起交流,如发现本书错误也欢迎通过博客园、QQ群等方式告知我。

本文地址:https://www.cnblogs.com/coredx/p/17746162.html

与使用Blazor WASM实现可取消的多文件带校验并发分片上传相似的内容:

使用Blazor WASM实现可取消的多文件带校验并发分片上传

前言 上传大文件时,原始HTTP文件上传功能可能会影响使用体验,此时使用分片上传功能可以有效避免原始上传的弊端。由于分片上传不是HTTP标准的一部分,所以只能自行开发相互配合的服务端和客户端。文件分片上传在许多情况时都拥有很多好处,除非已知需要上传的文件一定非常小。分片上传可以对上传的文件进行快速分

在 .NET 7上使用 WASM 和 WASI

WebAssembly(WASM)和WebAssembly System Interface(WASI)为开发人员开辟了新的世界。.NET 开发人员在 Blazor WebAssembly 发布时熟悉了 WASM。Blazor WebAssembly 在浏览器中基于 WebAssembly 的 .N

使用Blazor WebAssembly整合PocketBase的基础项目模板

使用Blazor WebAssembly整合PocketBase的基础项目模板 在这篇博客文章中,我们将探讨如何创建一个集成PocketBase的基础Blazor WebAssembly项目。我们将涵盖用户身份验证、注册和密码找回功能。我们的项目使用PocketBaseClient,这是一个动态生成

原来.NET写的Linux桌面这么好看?

如何使用Blazor在Linux平台下运行Desktop程序 本文将讲解如何使用Blazor运行跨平台应用,应用到的技术有以下几点 Blazor Masa Blazor Photino.Blazor Ubuntu 用于验证跨平台性,并且是否提高开发效率,Blazor和Photino一块使用的技术称为

WPF使用Blazor的快速案例

下面我们将讲解在WPF中使用Blazor,并且使用Blazor做一些文件编辑操作,下面是需要用到的东西 - WPF - Blazor - Masa Blazor - Monaco ## 安装Masa Blazor模板 使用`CMD`指令安装模板 ```shell dotnet new install

如何使用 Blazor 框架在前端浏览器中导入和导出 Excel

摘要:本文由葡萄城技术团队于博客园原创并首发。转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 前言 Blazor 是一个相对较新的框架,用于构建具有 .NET 强大功能的交互式客户端 Web UI。一个常见的用例是将现有的 Excel 文件导入 Blazo

.NET 8新预览版本使用 Blazor 组件进行服务器端呈现

简介 此预览版添加了对使用 Blazor 组件进行服务器端呈现的初始支持。这是 Blazor 统一工作的开始,旨在使 Blazor 组件能够满足客户端和服务器端的所有 Web UI 需求。这是该功能的早期预览版,因此仍然受到一定限制,但我们的目标是无论选择如何构建应用,都能使用可重用的 Blazor

项目完成小结:使用Blazor和gRPC开发大模型客户端

## 前言 先介绍下这个项目。 最近我一直在探索大语言模型,根据不同场景训练了好几个模型,为了让用户测试使用,需要开发前端。 这时候,用 Gradio 搭建的前端是不太够的,虽说 GitHub 上也有一堆开源的 ChatGPT 前端,但我看了一圈,并没有找到便于二次开发定制的,再一想,这么简单的功能

快速使用ChatGpt Web Server

快速使用ChatGpt Web Server ChatGpt Web Server是使用Blazor Server模式部署的一个服务,所有的逻辑和代码执行都会在服务器执行,然后通过SignalR传输到前端渲染。通过这样模式,我们部署ChatGpt Web Server只需要新加坡服务器,不需要在搭建

Blazor开发小游戏?趁热打铁上!!!

大家好,我是沙漠尽头的狼。 网站使用Blazor重构上线一天了,用Blazor开发是真便捷,空闲时间查查gpt和github,又上线一个 [正则表达式在线验证工具](https://dotnet9.com/tools/regextester) 和几个在线小游戏,比如 [井字棋游戏](https://