【Azure 存储服务】多设备并发往 Azure Storage Blob 的 Container 存数据是否可以

Container,设备,Azure ,存储 · 浏览次数 : 55

小编点评

```csharp public static async Task DemonstratePessimisticConcurrencyBlob(BlobClient blobClient){ Console.WriteLine("Demonstrate pessimistic concurrency\"); BlobContainerClient containerClient = blobClient.GetParentBlobContainerClient(); BlobLeaseClient blobLeaseClient = blobClient.GetBlobLeaseClient(); try { // Create the container if it does not exist. await containerClient.CreateIfNotExistsAsync(); // Upload text to a blob. string blobContents1 = "First update. Overwrite blob if it exists.\"; byte[] byteArray = Encoding.ASCII.GetBytes(blobContents1); using (MemoryStream stream = new MemoryStream(byteArray)) { BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true); } // Acquire a lease on the blob. BlobLease blobLease = await blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(15)); Console.WriteLine("Blob lease acquired. LeaseId = {0}", blobLease.LeaseId); // Set the request condition to include the lease ID. BlobUploadOptions blobUploadOptions = new BlobUploadOptions() { Conditions = new BlobRequestConditions { LeaseId = blobLease.LeaseId } }; // Write to the blob again, providing the lease ID on the request. // The lease ID was provided, so this call should succeed. string blobContents2 = "Second update. Lease ID provided on request.\"; byteArray = Encoding.ASCII.GetBytes(blobContents2); using (MemoryStream stream = new MemoryStream(byteArray)) { BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, blobUploadOptions); } // This code simulates an update by another client. // The lease ID is not provided, so this call fails. string blobContents3 = "Third update. No lease ID provided.\"; byteArray = Encoding.ASCII.GetBytes(blobContents3); using (MemoryStream stream = new MemoryStream(byteArray)) { // This call should fail with error code 412 (Precondition Failed). BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream); } } catch (RequestFailedException e) { if (e.Status == (int)HttpStatusCode.PreconditionFailed) { Console.WriteLine( @"Precondition failure as expected. The lease ID was not provided.\" ); } else { Console.WriteLine(e.Message); throw; } } finally { await blobLeaseClient.ReleaseAsync(); } } ```

正文

问题描述

多设备并发往 Azure Storage Blob 的 Container 存数据是否可以?

 

问题解答

可以! Azure Storage 是支持的并发存储数据的,Blob 可以使用乐观并发或悲观并发模型的,具体实现可以参考文档:https://docs.microsoft.com/zh-cn/azure/storage/blobs/concurrency-manage?tabs=dotnet#pessimistic-concurrency-for-blobs 

 

当前,非常多的新应用程序通常允许多名用户同时查看和更新数据。尤其是在多名用户可以更新相同数据的情况下。

开发人员通常考虑下面三个主要数据并发策略:

  • 乐观并发:执行更新的应用程序将在其更新过程中确定数据是否自该应用程序上次读取此数据以来已发生更改。 例如,如果两名查看 wiki 页面的用户对该页面进行更新,则 wiki 平台必须确保第二次更新不会覆盖第一次更新。 此外还必须确保两名用户都了解其更新是否成功。 此策略最常用于 Web 应用程序中。

  • 悲观并发:要执行更新的应用程序会对对象上锁,以防其他用户在该锁释放前更新数据。 例如,在进行主/辅数据复制且只有主对象执行更新的情况下,该对象通常会长时间以独占的形式锁定数据,以确保其他任何对象都不能更新该数据。

  • 以最后写入者为准:一种方法,它允许更新操作继续进行,而不需要首先确定其他应用程序是否自数据被读取以来已更新该数据。 当数据分区时,通常会使用此方法,这样就不可能有多个用户同时访问相同的数据。 该策略可能还适用于正在处理短期数据流的情况。

Azure 存储支持所有三个策略,但是它在为乐观和悲观并发提供全面支持的能力方面与众不同。

Azure 存储旨在采用强大的一致性模型,确保在服务执行插入或更新操作后,后续读取操作会返回最新更新。

 

乐观并发和悲观并发实现的关键是在 UploadAsync 方法中传递的 BlobUploadOptions 对象。比如,乐观并发中,通过判断 IfMatch 中记录每一次文件修改后,返回的ETag值,来验证当前的修改是否是在最新文件的基础上进行修改,避免造成对其他修改者所修改的内容进行覆盖。

        // Set the If-Match condition to the original ETag.
        BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
        {
            Conditions = new BlobRequestConditions()
            {
                IfMatch = originalETag
            }
        };

 悲观并发中,则是通过使用 LeaseId (租约ID)来控制当前的操作不允许其他用户操作! 


BlobLeaseClient blobLeaseClient = blobClient.GetBlobLeaseClient();

// Acquire a lease on the blob.
BlobLease blobLease = await blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(15));


// Set the request condition to include the lease ID. BlobUploadOptions blobUploadOptions = new BlobUploadOptions() { Conditions = new BlobRequestConditions() { LeaseId = blobLease.LeaseId } };

 

乐观并发代码示例:

Azure 存储会为每个已存储的对象分配一个标识符。 只要对对象执行写入操作,就会更新此标识符。

该标识符作为 HTTP GET 响应的一部分在 ETag 标头(通过 HTTP 协议定义)中返回到客户端。

执行更新的客户端可以将原始 ETag 连同条件标头一起发送,以确保只有在满足特定条件的情况下才会进行更新。

例如,如果指定了 If-Match 标头,Azure 存储会验证更新请求中指定的 ETag 的值与所更新对象的 ETag 的值是否相同。  

 

下面的代码示例演示如何在用于检查 blob 的 ETag 值的写入请求中构造 If-Match 条件。

Azure 存储会评估 blob 的当前 ETag 是否与请求中提供的 ETag 相同,只有在两个 ETag 值匹配时才执行写入操作。

如果其他进程已在此期间更新该 blob,则 Azure 存储会返回 HTTP 412(“不满足前提条件”)状态消息。

private static async Task DemonstrateOptimisticConcurrencyBlob(BlobClient blobClient)
{
    Console.WriteLine("Demonstrate optimistic concurrency");

    BlobContainerClient containerClient = blobClient.GetParentBlobContainerClient();

    try
    {
        // Create the container if it does not exist.
        await containerClient.CreateIfNotExistsAsync();

        // Upload text to a new block blob.
        string blobContents1 = "First update. Overwrite blob if it exists.";
        byte[] byteArray = Encoding.ASCII.GetBytes(blobContents1);

        ETag originalETag;
        
        using (MemoryStream stream = new MemoryStream(byteArray))
        {
            // 从 Azure 存储检索 blob。 响应包括用于标识对象的当前版本的 HTTP ETag 标头值。
            BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true);
            originalETag = blobContentInfo.ETag;
            Console.WriteLine("Blob added. Original ETag = {0}", originalETag);
        }

        // This code simulates an update by another client.
        // No ETag was provided, so original blob is overwritten and ETag updated.
        string blobContents2 = "Second update overwrites first update.";
        byteArray = Encoding.ASCII.GetBytes(blobContents2);

        using (MemoryStream stream = new MemoryStream(byteArray))
        {
            BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true);
            Console.WriteLine("Blob updated. Updated ETag = {0}", blobContentInfo.ETag);
        }

        // Now try to update the blob using the original ETag value.
        string blobContents3 = "Third update. If-Match condition set to original ETag.";
        byteArray = Encoding.ASCII.GetBytes(blobContents3);

        // Set the If-Match condition to the original ETag.
        
        // 在更新 blob 时,应将在步骤 1 中获得的 ETag 值包括在写入请求的 If-Match 条件标头中。 Azure 存储会将请求中的 ETag 值与 blob 当前的 ETag 值进行比较。
        BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
        {
            Conditions = new BlobRequestConditions()
            {
                IfMatch = originalETag
            }
        };

        using (MemoryStream stream = new MemoryStream(byteArray))
        {
            // This call should fail with error code 412 (Precondition Failed).
            //如果 blob 当前的 ETag 值不同于请求中提供的 If-Match 条件标头中指定的 ETag 值,则 Azure 存储会返回 HTTP 状态代码412(“不满足前提条件”)。 此错误向客户端表明,另一进程在客户端首先检索 blob 后已更新该 blob。
            //如果 blob 的当前 ETag 值与请求的 If-Match 条件标头中的 ETag 的版本相同,则 Azure 存储会执行请求的操作,并更新该 blob 的当前 ETag 值。
            BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, blobUploadOptions);
        }
    }
    catch (RequestFailedException e)
    {
        if (e.Status == (int)HttpStatusCode.PreconditionFailed)
        {
            Console.WriteLine(
                @"Precondition failure as expected. Blob's ETag does not match ETag provided.");
        }
        else
        {
            Console.WriteLine(e.Message);
            throw;
        }
    }
}

 

悲观并发代码示例:

若要锁定 Blob 以供独占使用,您可以对该 Blob 获得租约。

获取租约时,可以指定租期。 有限期租约的有效期可为 15 到 60 秒。

租约也可以是无限期的,这相当于一个排他锁。

可续订有限期租约来延长租约,也可在租约完成后将其释放。

Azure 存储在有限期租约到期时会自动释放这些租约。

 

下面的代码示例演示了如何获取 blob 的独占租约,通过提供租约 ID 来更新 blob 的内容,然后释放租约。
如果租约有效,但写入请求中未提供租约 ID,则写入操作会失败,并出现错误代码 412(“不满足前提条件”)。
public static async Task DemonstratePessimisticConcurrencyBlob(BlobClient blobClient)
{
    Console.WriteLine("Demonstrate pessimistic concurrency");

    BlobContainerClient containerClient = blobClient.GetParentBlobContainerClient();
    BlobLeaseClient blobLeaseClient = blobClient.GetBlobLeaseClient();

    try
    {
        // Create the container if it does not exist.
        await containerClient.CreateIfNotExistsAsync();

        // Upload text to a blob.
        string blobContents1 = "First update. Overwrite blob if it exists.";
        byte[] byteArray = Encoding.ASCII.GetBytes(blobContents1);
        using (MemoryStream stream = new MemoryStream(byteArray))
        {
            BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true);
        }

        // Acquire a lease on the blob.
        BlobLease blobLease = await blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(15));
        Console.WriteLine("Blob lease acquired. LeaseId = {0}", blobLease.LeaseId);

        // Set the request condition to include the lease ID.
        BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
        {
            Conditions = new BlobRequestConditions()
            {
                LeaseId = blobLease.LeaseId
            }
        };

        // Write to the blob again, providing the lease ID on the request.
        // The lease ID was provided, so this call should succeed.
        string blobContents2 = "Second update. Lease ID provided on request.";
        byteArray = Encoding.ASCII.GetBytes(blobContents2);

        using (MemoryStream stream = new MemoryStream(byteArray))
        {
            BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, blobUploadOptions);
        }

        // This code simulates an update by another client.
        // The lease ID is not provided, so this call fails.
        string blobContents3 = "Third update. No lease ID provided.";
        byteArray = Encoding.ASCII.GetBytes(blobContents3);

        using (MemoryStream stream = new MemoryStream(byteArray))
        {
            // This call should fail with error code 412 (Precondition Failed).
            BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream);
        }
    }
    catch (RequestFailedException e)
    {
        if (e.Status == (int)HttpStatusCode.PreconditionFailed)
        {
            Console.WriteLine(
                @"Precondition failure as expected. The lease ID was not provided.");
        }
        else
        {
            Console.WriteLine(e.Message);
            throw;
        }
    }
    finally
    {
        await blobLeaseClient.ReleaseAsync();
    }
}

 

 

参考资料

在 Blob 存储中管理并发 : https://learn.microsoft.com/zh-cn/azure/storage/blobs/concurrency-manage?tabs=dotnet#pessimistic-concurrency-for-blobs

 
 

与【Azure 存储服务】多设备并发往 Azure Storage Blob 的 Container 存数据是否可以相似的内容:

【Azure 存储服务】多设备并发往 Azure Storage Blob 的 Container 存数据是否可以

问题描述 多设备并发往 Azure Storage Blob 的 Container 存数据是否可以? 问题解答 可以! Azure Storage 是支持的并发存储数据的,Blob 可以使用乐观并发或悲观并发模型的,具体实现可以参考文档:https://docs.microsoft.com/zh-

【Azure API Management】实现在API Management服务中使用MI(管理标识 Managed Identity)访问启用防火墙的Storage Account

问题描述 在Azure的同一数据中心,API Management访问启用了防火墙的Storage Account,并且把APIM的公网IP地址设置在白名单。但访问依旧是403 原因是: 存储帐户部署在同一区域中的服务使用专用的 Azure IP 地址进行通信。 因此,不能基于特定的 Azure 服

【Azure 事件中心】Event Hubs中存在非常多的错误数据,是否能提前删除这些数据呢?

问题描述 因为一些特殊原因,Event Hub 里面堆积了很多不需要的数据事件,正常要等事件中的过期时间到后才有Event Hub自动删除掉,但希望能够尽快马上删除,有没有什么手动的方法吗? 问题解答 Event Hub是一个数据事件处理服务,最主要的功能就是:接收和发送事件。它并不是一个数据存储服

【Azure 存储服务】存储在Azure Storage Table中的数据,如何按照条件进行删除呢?

问题描述 如何按条件删除 Storage Table 中的数据,如果Table中有大量的条记录需要删除,Java代码如何按条件删除 Table中的数据(Entity)? (通过Azure Storage Explorer工具是可以删除,但是由于数据量太大,人工操作耗时太久,所以需要使用Java代码完

【Azure 存储服务】Azure Blob Storage SDK 升级失败,遇见 Unsatisfied Dependency Exception 和 Unexpected Length Exception

问题描述 在升级Java Azure Blob Storage SDK的过程中,先后遇见了 UnsatisfiedDependencyException 和 UnexpectedLengthException. 错误一:Org.springframework.beans.factory Unsati

【Azure 存储服务】Azure Data Lake Storage (ADLS) Gen2 GRS Failover是否支持自动切换或者手动切换到灾备的终结点呢?

问题描述 在Azure的存储服务中,介绍灾备恢复和Storage Account故障转移的文档中,有一句话“Account failover is not supported for storage accounts with a hierarchical namespace enabled.” 而

【Azure 存储服务】如何查看Storage Account的删除记录,有没有接口可以下载近1天删除的Blob文件信息呢?

问题描述 如何查看Storage Account的删除记录,有没有接口可以下载近1天删除的Blob文件信息呢?因为有时候出现误操作删除了某些Blob文件,想通过查看删除日志来定位被删除的文件信息。 问题解答 如果没有启用Storage Account的软删除功能,则没有办法直接查看近期有删除的Blo

【Azure 存储服务】MP4视频放在Azure的Blob里面,用生成URL在浏览器中打开之后,视频可以正常播放却无法拖拽视频的进度

问题描述 把MP4视频放在Azure的Blob里面,用生成URL在浏览器中打开之后,视频可以正常播放却无法拖拽视频的进度,这是什么情况呢? 问题解答 因为MP4上传到Azure Blob后,根据公开的权限,可以直接通过Storage Blob URL +/ Blob Container + / Bl

【Azure 存储服务】Azure Storage Account Queue中因数据格式无法处理而在一个小时内不在可见的问题

问题描述 在从Storage Account 队列中获取数据(Queue),在门户中,明显看见有数据,但是通过消费端代码去获取的时候,就是无法获取到有效数据的情况。获取消息的代码如下: 问题解答 经过对 receiveMessages 方法定义的查询,第二个参数,第三个参数的两个时间表示的意思为 消

【Azure 存储服务】.NET7.0 示例代码之上传大文件到Azure Storage Blob (一)

问题描述 在使用Azure的存储服务时候,如果上传的文件大于了100MB, 1GB的情况下,如何上传呢? 问题解答 使用Azure存储服务时,如果要上传文件到Azure Blob,有很多种工具可以实现。如:Azure 门户, Azure Storage Explorer, 命令行工具 az copy