【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作

Blob,内容,追加,对象 · 浏览次数 : 108

小编点评

排版: ``` Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) at reactor.core.publisher.Mono.block(Mono.java:1703) at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:128) at com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse(AppendBlobClient.java:259) ``` 排版说明: * Suppressed:指示代码中出现异常的排版。 * java.lang.Exception: #block terminated with an error:指示代码中出现异常的排版。 * reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99):指示代码中使用阻塞单线程获取流的排版。 * reactor.core.publisher.Mono.block(Mono.java:1703):指示代码中使用单线程阻塞获取流的排版。 * com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:128):指示代码中使用阻塞操作的排版。 * com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse(AppendBlobClient.java:259):指示代码中使用块式操作的排版。

正文

问题描述

在Azure Blob的官方示例中,都是对文件进行上传到Blob操作,没有实现对已创建的Blob进行追加的操作。如果想要实现对一个文件的多次追加操作,每一次写入的时候,只传入新的内容?

 

问题解答

Azure Storage Blob 有三种类型: Block Blob, Append Blob 和 Page Blob。其中,只有Append Blob类型支持追加(Append)操作。并且Blob类型在创建时就已经确定,无法后期修改。

 

在查看Java Storage SDK后,发现可以使用AppendBlobClient来实现。

    /**
     * Creates a new {@link AppendBlobClient} associated with this blob.
     *
     * @return A {@link AppendBlobClient} associated with this blob.
     */
    public AppendBlobClient getAppendBlobClient() {
        return new SpecializedBlobClientBuilder()
            .blobClient(this)
            .buildAppendBlobClient();
    }

在 AppendBlobClient 类,有 appendBlock 和 appendBlockWithResponse 等多种方法来实现追加。方法定义源码如下:

    /**
     * Commits a new block of data to the end of the existing append blob.
     * <p>
     * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
     * {@code Flux} must produce the same data each time it is subscribed to.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.specialized.AppendBlobClient.appendBlock#InputStream-long}
     *
     * @param data The data to write to the blob. The data must be markable. This is in order to support retries. If
     * the data is not markable, consider using {@link #getBlobOutputStream()} and writing to the returned OutputStream.
     * Alternatively, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark support.
     * @param length The exact length of the data. It is important that this value match precisely the length of the
     * data emitted by the {@code Flux}.
     * @return The information of the append blob operation.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public AppendBlobItem appendBlock(InputStream data, long length) {
        return appendBlockWithResponse(data, length, null, null, null, Context.NONE).getValue();
    }

    /**
     * Commits a new block of data to the end of the existing append blob.
     * <p>
     * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
     * {@code Flux} must produce the same data each time it is subscribed to.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse#InputStream-long-byte-AppendBlobRequestConditions-Duration-Context}
     *
     * @param data The data to write to the blob. The data must be markable. This is in order to support retries. If
     * the data is not markable, consider using {@link #getBlobOutputStream()} and writing to the returned OutputStream.
     * Alternatively, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark support.
     * @param length The exact length of the data. It is important that this value match precisely the length of the
     * data emitted by the {@code Flux}.
     * @param contentMd5 An MD5 hash of the block content. This hash is used to verify the integrity of the block during
     * transport. When this header is specified, the storage service compares the hash of the content that has arrived
     * with this header value. Note that this MD5 hash is not stored with the blob. If the two hashes do not match, the
     * operation will fail.
     * @param appendBlobRequestConditions {@link AppendBlobRequestConditions}
     * @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
     * @param context Additional context that is passed through the Http pipeline during the service call.
     * @return A {@link Response} whose {@link Response#getValue() value} contains the append blob operation.
     * @throws UnexpectedLengthException when the length of data does not match the input {@code length}.
     * @throws NullPointerException if the input data is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Response<AppendBlobItem> appendBlockWithResponse(InputStream data, long length, byte[] contentMd5,
        AppendBlobRequestConditions appendBlobRequestConditions, Duration timeout, Context context) {
        Objects.requireNonNull(data, "'data' cannot be null.");
        Flux<ByteBuffer> fbb = Utility.convertStreamToByteBuffer(data, length, MAX_APPEND_BLOCK_BYTES, true);
        Mono<Response<AppendBlobItem>> response = appendBlobAsyncClient.appendBlockWithResponse(
            fbb.subscribeOn(Schedulers.elastic()), length, contentMd5, appendBlobRequestConditions, context);
        return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
    }
View Code

代码实现

第一步: 在Java项目 pom.xml 中引入Azure Storage Blob依赖

    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-storage-blob</artifactId>
      <version>12.13.0</version>
    </dependency>

第二步: 引入必要的 Storage 类

import java.io.ByteArrayInputStream; 
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; 
import java.time.LocalTime; 
import com.azure.core.http.rest.Response; 
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.AppendBlobItem;
import com.azure.storage.blob.models.AppendBlobRequestConditions; 
import com.azure.storage.blob.specialized.AppendBlobClient; 

第三步:创建 AppendBlobClient 对象,使用 BlobServiceClient 及连接字符串(Connection String)

        String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*******;EndpointSuffix=core.chinacloudapi.cn";

                String containerName = "appendblob";
                String fileName = "test.txt";
                // Create a BlobServiceClient object which will be used to create a container
                System.out.println("\nCreate a BlobServiceClient Object to Connect Storage Account");
                BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
                                .connectionString(storageConnectionString)
                                .buildClient();

                BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(containerName);
                if (!containerClient.exists())
                        containerClient.create();

                // Get a reference to a blob
                AppendBlobClient appendBlobClient = containerClient.getBlobClient(fileName).getAppendBlobClient();

 

第四步:调用 appendBlockWithResponse 方法追加内容,并根据返回状态码判断是否追加成功

                boolean overwrite = true; // Default value
                if (!appendBlobClient.exists())
                        System.out.printf("Created AppendBlob at %s%n",
                                        appendBlobClient.create(overwrite).getLastModified());

String data
= "Test to append new content into exists blob! by blogs lu bian liang zhan deng @" + LocalTime.now().toString() + "\n"; InputStream inputStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); byte[] md5 = MessageDigest.getInstance("MD5").digest(data.getBytes(StandardCharsets.UTF_8)); AppendBlobRequestConditions requestConditions = new AppendBlobRequestConditions(); // Context context = new Context("key", "value"); long length = data.getBytes().length; Response<AppendBlobItem> rsp = appendBlobClient.appendBlockWithResponse(inputStream, length, md5, requestConditions, null, null); if (rsp.getStatusCode() == 201) { System.out.println("append content successful........"); }

运行结果展示

 

但如果操作的Blob类型不是Append Blob,就会遇见错误 Status code 409 ---- The blob type is invalid for this operation 错误

Exception in thread "main" com.azure.storage.blob.models.BlobStorageException: Status code 409, "
<?xml version="1.0" encoding="utf-8"?><Error>><Code>InvalidBlobType</Code>
<Message>The blob type is invalid for this operation. RequestId:501ee0b9-301e-0003-4f7b-829ca6000000 Time:2023-05-09T13:37:17.7509942Z</Message></Error>"
at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:484) at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:343) at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$5(RestProxy.java:382) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:337) at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:354) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397) at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:293) at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:192) at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130) at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130) at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:184) at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128) at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259) at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401) at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:416) at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:470) at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1589) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) at reactor.core.publisher.Mono.block(Mono.java:1703) at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:128) at com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse(AppendBlobClient.java:259) at test.App.AppendBlobContent(App.java:68) at test.App.main(App.java:31)

 

参考资料

appendBlockWithResponse : https://learn.microsoft.com/en-us/java/api/com.azure.storage.blob.specialized.appendblobclient?view=azure-java-stable#com-azure-storage-blob-specialized-appendblobclient-appendblockwithresponse(java-io-inputstream-long-byte()-com-azure-storage-blob-models-appendblobrequestconditions-java-time-duration-com-azure-core-util-context)

 Blob(对象)存储简介  : https://docs.azure.cn/zh-cn/storage/blobs/storage-blobs-introduction

与【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作相似的内容:

【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作

问题描述 在Azure Blob的官方示例中,都是对文件进行上传到Blob操作,没有实现对已创建的Blob进行追加的操作。如果想要实现对一个文件的多次追加操作,每一次写入的时候,只传入新的内容? 问题解答 Azure Storage Blob 有三种类型: Block Blob, Append Bl

【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)

问题描述 使用Azure Storage Table的REST API,实现根据过滤条件删除满足条件的数据,调用方法为 Delete Entity (Azure Storage) 问题实现 第一步:通过Azure Stroage 门户或者是其他工具(如:Azure Storage Explorer)

【Azure 存储服务】.NET7.0 示例代码之上传大文件到Azure Storage Blob (一)

问题描述 在使用Azure的存储服务时候,如果上传的文件大于了100MB, 1GB的情况下,如何上传呢? 问题解答 使用Azure存储服务时,如果要上传文件到Azure Blob,有很多种工具可以实现。如:Azure 门户, Azure Storage Explorer, 命令行工具 az copy

【Azure 存储服务】记一次调用Storage Blob API使用 SharedKey Authorization出现的403错误

问题描述 使用Azure Storag Blob REST API上传文件,用SharedKey作为Authorization出现403错误。 错误消息 b'\xef\xbb\xbfAuthenti

【Azure 存储服务】存储在Azure Storage Table中的数据,如何按照条件进行删除呢?

问题描述 如何按条件删除 Storage Table 中的数据,如果Table中有大量的条记录需要删除,Java代码如何按条件删除 Table中的数据(Entity)? (通过Azure Storage Explorer工具是可以删除,但是由于数据量太大,人工操作耗时太久,所以需要使用Java代码完

【Azure 存储服务】多设备并发往 Azure Storage Blob 的 Container 存数据是否可以

问题描述 多设备并发往 Azure Storage Blob 的 Container 存数据是否可以? 问题解答 可以! Azure Storage 是支持的并发存储数据的,Blob 可以使用乐观并发或悲观并发模型的,具体实现可以参考文档:https://docs.microsoft.com/zh-

【Azure 存储服务】.NET7.0 示例代码之上传大文件到Azure Storage Blob (二)

问题描述 在上一篇博文(【Azure 存储服务】.NET7.0 示例代码之上传大文件到Azure Storage Blob (一):https://www.cnblogs.com/lulight/p/17061631.html)中,介绍了第一种分片的方式上传文件。 本文章接着介绍第二种方式,使用 M

【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)

问题描述 查看Java Storage SDK,想找一个 uploadWithResponse 的示例代码,但是通过全网搜索,结果没有任何有帮助的代码。使用最近ChatGPT来寻求答案,得到非常有格式的内容: 问:java azure storage account to use uploadWit

【Azure API Management】实现在API Management服务中使用MI(管理标识 Managed Identity)访问启用防火墙的Storage Account

问题描述 在Azure的同一数据中心,API Management访问启用了防火墙的Storage Account,并且把APIM的公网IP地址设置在白名单。但访问依旧是403 原因是: 存储帐户部署在同一区域中的服务使用专用的 Azure IP 地址进行通信。 因此,不能基于特定的 Azure 服

【Azure 应用服务】Function App / App Service 连接 Blob 报错

问题描述 因 Blob 启用了防火墙功能,但是当把App Service 或 Function App的出站IP地址都加入到Blob的白名单中,为什么访问还是403错误呢? 问题解答 Azure Storage的IP网络规则不适用于同一数据中心的客户端。 存储帐户部署在同一区域中的服务使用专用的 A