文中写一些 Swift 方法签名时,会带上 label,如
subscribe(_ subscriber:)
,正常作为 Selector 的写法时会忽略掉 label,只写作subscribe(_:)
,本文特意带上 label 以使含义更清晰。
在 App 运行过程中会发生各种各样的异步事件,如网络请求的返回,Notification 的发送等。在处理这些异步事件时,我们经常会使用异步回调、代理方法等。Combine 框架提供了一种声明式的 Swift API,可以将一个异步事件的处理逻辑表示成单独的一个处理链,链上的每个节点接收上一个节点的处理结果,执行自己的处理逻辑,然后传递给下一个节点。
Combine 框架采用 publisher-subscriber 模式:
Publisher
表示一种能够随时间产生一系列值的类型。Combine 还为这些 Publishers 提供了许多 operators 来处理从上游接收的值,然后再重新发送到下游。Subscriber
表示的订阅者类型,接收并处理发送给它的值。Subscription
表示的类型对象,Subscriber 通过该对象来向 Publisher 请求值,而 Publisher 也只有在接收到 Subscriber 的显式请求时才会分发值。subscribe(_ subscriber:)
方法,将自己作为参数传过去;subscribe(_ subscriber:)
会接着调用 publisher 的 receive(subscriber:)
方法。receive(subscriber:)
是 Publisher
协议的要求方法,所有遵循 Publisher 的类型都必须实现该方法,在这个方法里处理 subscriber 的订阅逻辑。但是使用方又不能直接调用该方法,而必须通过调用扩展方法 subscribe(_ subscriber:)
来发起订阅。receive(subscription:)
方法。协议 Subscription 约束了一个方法 request(_ demand:)
,subscriber 通过调用该方法来说明自己需要请求多少的值。只有 request 之后,publisher 才会向该 subscriber 发布值。receive(_ input:)
方法来向其发布值,并在结束时调用 subscriber 的 receive(completion:)
来进行通知。注意这里面说的调用并不一定指 publisher 对象持有 subscriber 对象,然后直接调用 subscriber 对象的上述方法,具体是否持有是具体实现细节,也有可能通过某些中间对象间接调用。Anyway,实现 Subscriber 协议的类型必须实现这两个方法(以及开头的 receive(subscription:)
方法),在这些方法中处理从 publisher 那里接收到的值。Combine 框架提供了许多内置的 publisher 类型供我们使用,如:
publisher(for:object:)
扩展;dataTaskPublisher(for:)
扩展;Subject 给我们提供了一种向流中插入值的方式,为我们在存量命令式编程的代码中引入 Combine 提供了一个强大的工具。Subject 本身即是一个 publisher,下游可以正常去 subscribe 它,然后使用方通过调用它的 send(_ value:)
方法来发布一个值。Combine 提供了两种 subject:
send(_ value:)
来更新当前值。当一个新的 subscriber 订阅时,会马上收到一次最新的当前值。send(_ value:)
时才会向下游发布值。该 property wrapper 修饰 Class 的某个属性,为其生成一个 publisher,使用方通过 $
加上属性名来访问该 publisher。当属性值变化时,该 publisher 会在属性的 willSet 里发布新值,因此需要留意属性值本身尚未被更新仍是旧值,传给 subscriber 的值是新值。
Combine 提供了两个内置的 subscribers:Subscribers.Sink 和 Subscribers.Assign,但一般不直接创建这两个的实例,而是通过 Publisher 的两种扩展方法 sink 和 assign 来获取类型抹除后的 AnyCancellable 对象。
Sink subscriber 创建的时候会立即调用 subscription 对象的 request(.unlimited)
,后面会详细介绍 Demand 的用法,这里需要留意的是一旦请求了 .unlimited 的 demand 之后,便无法再调整了,也就是说只要 publisher 不断地产生新的值,Sink 就会持续地接收到新值,直至被 cancel。
Publisher 有两个 sink 扩展方法:
sink(receiveCompletion:receiveValue:)
:两个闭包的含义和用法不必多解释;sink(receiveValue:)
:只有当 Publisher 的 Failure associated type 是 Never 时才可以使用该方法。Assign subscriber 会将接收到的值赋值给一个类对象的属性或者一个另一个 Published publisher 上,它对 publisher 的 demand 也是 .unlimited。
assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
:
assign(to published: inout Published<Self.Output>.Publisher)
:
Combine 还为 publisher 添加了许多扩展方法,称为 operators,它们返回的也是一个 Publisher,因此可以进行链式调用。每个 operator 接收一个上游 publisher,处理转换上游发布的值,然后重新发布到下游。
具体的 operator 及其用法详见文档。
常用的 Subject 如 sink(receiveValue:)
会在订阅到一个 publisher 时立即发起一个 unlimited demand,对应的 publisher 如果此时有值则会马上发布,但这时使用方并不一定准备好接收并处理数据。另一种情况是如果一个 publisher 期望有多个 subscribers,但由于每个 subscriber 订阅的时机不一样,有可能当第一个 subscriber 订阅时,publisher 就已经把值给发布出去了,这样当第二个 subscriber 订阅时,只会收到一个 completion。
Combine 提供一个 ConnectablePublisher
协议来支持手动控制开始发布值的时机,遵循该协议的 Publisher 只有在显示调用 connect()
方法之后,才会开始值发布的过程,在这之前,即使满足 publisher 发布值的条件,也不会进行发布。
使用 Publisher 的 makeConnectable()
operator 来将一个已有的 publisher 包装成一个 Publishers.MakeConnectable
实例,该实例便是一个 ConnectablePublisher,之后使用方便可在合适的时机调用其 connect() 方法来开启值的发布。
Combine 中有些 publisher 已经实现了 ConnectablePublisher 协议,如 Publishers.Multicast
,Timer.TimerPublisher
等,有时在一些使用这些 publisher 的简单场景中,显式调用 connect()
反而显得繁琐,因此 ConnectablePublisher 又提供了一个 autoconnect()
operator,该操作符会在一个 subscriber 订阅它时立刻调用 connect 方法。
在 Combine 中,一个 Publisher 只有在被 Subscriber 订阅并且发起要求的时候才会产生值。Subscriber 有下面两种方式来发起要求:
request(_ demand:)
方法,Subscription 对象在发起订阅时由 receive(subscription:)
方法传入;receive(_ input:)
调用时,可以返回一个新的 Demand。Demand 表示 subscriber 需要多少值,Combine 提供的 API 里面,有 .none
, .unlimited
, 以及指定具体数目的 .max(Int)
。Demand 是可加的,如一个 subscriber 要求了 2 个值,然后又要求了 .max(3),则其订阅的 publisher 现在共有 5 个未满足的值,每当 publisher 发布一个值,其为满足的 demand 便随之减 1,这也是唯一使其为满足的 demand 数值减少的方式,因为 Demand 不支持负值。而一旦 subscriber 要求了 .unlimited 的 demand,则后续就无法继续再同 publisher 协商了。
内置的 Subscriber Sink 和 Assign 都是一开始就请求了 .unlimited 的 demand,如果需要精细化地控制 publisher 发送值的 rate,可以实现一个自定义的 Subscriber,如:
class MySubscriber: Subscriber {
typealias Input = Date
typealias Failure = Never
var subscription: Subscription?
func receive(subscription: Subscription) {
print("published received")
self.subscription = subscription
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
subscription.request(.max(3))
}
}
func receive(_ input: Date) -> Subscribers.Demand {
print("\(input) \(Date())")
return Subscribers.Demand.none
}
func receive(completion: Subscribers.Completion<Never>) {
print ("--done--")
}
}
自定义 Subscriber 的要点即是实现协议约束的三个方法,自定义的 subscriber 可以自己持有传入的 Subscription 对象,以实现精细化的控制。这种由 subscriber 来控制流速的行为称为 back pressure。
除了自定义 Subscriber,Combine 也提供了一些操作符给内置的 subscriber 使用以协助控制流速,这些操作符内部实现一些缓存相关的逻辑:
buffer:最大缓存一定数目的值,超出后丢弃或抛出错误;
debounce:设定一个 dueTime,假设时间为 t0 上游发布一个值,这时 debounce 不会立刻重新发布,而是创建一个重发布任务在 t0 + dueTime 之后执行(注意这里的任务是为了便于理解抽象出的概念,不代表具体实现真正地创建了一个任务,不过有可能确实是这样实现的);但如果在这期间,在时间 t1 时上游又发布了一个值,则之前的延时任务会被丢弃,会重新创建一个任务在 t1 + dueTime 之后才会重新发布,如果在这期间上游又发送了一个值,则以此类推。如:
let bounces:[(Int,TimeInterval)] = [
(0, 0),
(1, 0.25), // 0.25s interval since last index
(2, 1), // 0.75s interval since last index
(3, 1.25), // 0.25s interval since last index
(4, 1.5), // 0.25s interval since last index
(5, 2) // 0.5s interval since last index
]
let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { index in
print ("Received index \(index)")
}
for bounce in bounces {
DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
subject.send(bounce.0)
}
}
// Prints:
// Received index 1
// Received index 4
// Received index 5
// Here is the event flow shown from the perspective of time, showing value delivery through the `debounce()` operator:
// Time 0: Send index 0. (republish task0 at: 0.5)
// Time 0.25: Send index 1. (task0 is discarded, republish task1 at 0.25 + 0.5 = 0.75)
// Time 0.75: Debounce period ends, publish index 1. (execute task1)
// Time 1: Send index 2. (republish task2 at: 1 + 0.5 = 1.5)
// Time 1.25: Send index 3. (task2 is discarded, republish task3 at 1.25 + 0.5 = 1.75)
// Time 1.5: Send index 4. (task3 is discarded, republish task4 at 1.5 + 0.5 = 2.0)
// Time 2: Debounce period ends, publish index 4. Also, send index 5. (execute task4. republish task5 at: 2 + 0.5 = 2.5)
// Time 2.5: Debounce period ends, publish index 5. (execute task5)
throttle:设定一个 interval,每次达到时间时,会检查这一小段时间内有无值被发布,如果有的话,根据设定的 latest 参数决定将最新或最旧的值发布到下游。
collect:从上游接收到值时,先搜集起来,超过给定的数目或者超过给定的时间间隔之后,再把所有搜集到的值发给下游。
许多人经常搞不清 debounce 和 throttle 的区别,从上面的解释可以很清楚地看出二者的机制和差异,throttle 很稳定地定期发布一次(如有值可发布),而 debounce 如果上游频繁地发布值的话,可能要等好久才会发布一次,这也正是 debounce 的作用,比如在输入框输入文字的场景。
Apple 家的新东西都有一个特点:只能在比较新的操作系统版本上使用,比如 Combine 要求 iOS 13 以上才能使用,而且一些 API 更新可能会要求更新的 OS 版本。然而有位大佬 Sergej Jaskiewicz 开发了 Combine 的开源实现:OpenCombine,完全兼容 Combine 的 API,可以运行在老的 iOS 和 macOS 版本上,甚至支持 Windows、Linux 和 WASM。
我们来简单看一下内置的 Publisher 之一的 PassthroughSubject 的开源实现。
先从订阅流程看起,可以结合上面的图作为参照。首先是 Publisher 的扩展方法 subscribe(_ subscribe)
,其实就是简单地调用了一下具体 Publisher 类型的协议约束方法,传入参数 subscriber: receive(subscriber: subscriber)
。PassthroughSubject 的协议方法实现为:
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Failure == Downstream.Failure
{
lock.lock()
if active {
let conduit = Conduit(parent: self, downstream: subscriber) // a.
downstreams.insert(conduit) // b.
lock.unlock()
subscriber.receive(subscription: conduit) // c.
} else {
let completion = self.completion!
lock.unlock()
subscriber.receive(subscription: Subscriptions.empty) // d.
subscriber.receive(completion: completion) // e.
}
}
先看 c 处,最终调用了 subscriber 的 receive(subscription:)
,传入的 subscription 对象在 a 处创建,在 b 处被 publisher 插入到自己内部的一个 downstreams 数组中,该对象传给 subscriber 之后也会被 subscriber 对象持有,用于向 publisher 要求 demand、执行 cancel。
我们上面多次介绍到 Subscription 类型,但 Combine 并没有提供具体的实现类型,因为它其实是某个具体 publisher 的实现的一部分,这里的 Conduit 便是 PassthroughSubject 的 Subscription 实现类。我们接着看 PassthroughSubject 的另一个协议约束方法:
// PassthroughSubject 的 send(_ input:) 实现
public func send(_ input: Output) {
lock.lock()
guard active else {
lock.unlock()
return
}
let downstreams = self.downstreams
lock.unlock()
for conduit in downstreams {
conduit.offer(input)
}
}
// Conduit 的 offer(_ output:) 实现
override func offer(_ output: Output) {
lock.lock()
guard demand > 0, let downstream = downstream else {
lock.unlock()
return
}
demand -= 1 // a.
lock.unlock()
downstreamLock.lock()
let newDemand = downstream.receive(output) // b.
downstreamLock.unlock()
guard newDemand > 0 else { return }
lock.lock()
demand += newDemand // c.
lock.unlock()
}
PassthroughSubject 的 send(_ input:)
会调用 Conduit 的 offer(_ output:)
,在 offer 方法中,a 处将 demand 减 1,b 处调用 subscriber 的 receive(_ input:) 方法,c 处再将返回的 newDemand 加到现有 demand 上面,和前文描述的逻辑完全一致。
其他方法实现、Subscriber 的实现、各种操作符的实现可以直接翻源码,了解各个协议之间的约束之后非常简洁易读。
本文详细探讨了Spring框架中的面向切面编程(AOP),特别是通过@EnableAspectJAutoProxy注解来启用和配置AOP的详细过程。