2025-02-08 10:44:05
,某些文章具有时效性,若有错误或已失效,请在下方留言。AsyncStream
and AsyncThrowingStream
can be thought of a bit like continuations that can send back multiple values, but also like an AsyncSequence
that is able to buffer only a certain number of values according to a policy.AsyncStream
和 AsyncThrowingStream
可以看作有点像可以发回多个值的延续,但也类似于能够根据策略仅缓冲一定数量的值的 AsyncSequence
。
Let’s start with a simple stream that sends back nine integers:
让我们从一个发回 9 个整数的简单流开始:
let stream = AsyncStream { continuation in
for i in 1...9 {
continuation.yield(i)
}
continuation.finish()
}
We could then read and print the values from that stream using a for await
loop, like this:
然后,我们可以使用 for await
循环从该流中读取和打印值,如下所示:
let stream = AsyncStream { continuation in
for i in 1...9 {
continuation.yield(i)
}
continuation.finish()
}
for await item in stream {
print(item)
}
Tip: Any values yielded after calling finish()
on your continuation will be ignored.
提示: 在对 continuation 调用 finish()
后产生的任何值都将被忽略。
You can see immediately that AsyncStream
looks like using withCheckedContinuation()
: we give it a closure to run to generate its values, and we’re passed a continuation inside there that allows us to send values back to the call site.
你可以立即看到 AsyncStream
看起来像使用 withCheckedContinuation()
:我们给它一个闭包来运行以生成它的值,并在其中传递一个延续,允许我们将值发送回调用站点。
This time, though, we can call yield()
as many times as we want, whereas calling continuation.resume(returning:)
could be called only once.
不过,这一次,我们可以根据需要多次调用 yield(),
而 call continuation.resume(returning:)
只能调用一次。
Because AsyncStream
lets us send back multiple values, the only way for Swift to know the stream has ended is by us calling finish()
on the continuation. Without that, the stream would never end, and our for await
loop would never end – the program would just run endlessly, waiting for more data from the stream.
因为 AsyncStream
允许我们发回多个值,所以 Swift 知道流已经结束的唯一方法是我们在 continuation 上调用 finish()。
没有它,流将永远不会结束,我们的 for await
循环将永远不会结束 – 程序将无休止地运行,等待来自流的更多数据。
Tip: I would recommend trying this yourself – just comment out continuation.finish()
and run the program again, so you can start to get a better feel for how AsyncStream
works.
提示: 我建议自己尝试一下 – 只需注释掉 continuation.finish()
并再次运行该程序,这样您就可以开始更好地了解 AsyncStream
的工作原理。
So far, this looks just like a continuation that can send back multiple values. But here’s where AsyncStream
gets clever:
到目前为止,这看起来就像一个可以发回多个值的 continuation。但这就是 AsyncStream
的聪明之处:
- You can asynchronously read values from the stream.
您可以从流中异步读取值。 - You can concurrently read values from the stream.
您可以同时 从流中读取值。 - You can also asynchronously add values into the stream.
您还可以异步地将值添加到 流中。
Each of those are extremely powerful, so let’s tackle them one by one.
这些都非常强大,所以让我们一一解决它们。
First, the ability to asynchronously read values from the stream means we can read values whenever we want and as often as we want.
首先,从流中异步读取值的能力意味着我们可以随时随地读取值。
For example, we could do some work, then read some values from our stream, do some other work, read some more values, do still other work, and so on, like this:
例如,我们可以做一些工作,然后从我们的流中读取一些值,做一些其他工作,读取更多值,做其他工作,等等,如下所示:
let stream = AsyncStream { continuation in
for i in 1...9 {
continuation.yield(i)
}
continuation.finish()
}
for _ in 1...3 {
print("The next three are:")
for await item in stream.prefix(3) {
print(item)
}
}
Warning: Calling prefix()
on an AsyncStream
returns a dedicated type called AsyncPrefixSequence
, which will read N values from the stream each time you loop over it rather than acting as a slice.
警告: 在 AsyncStream
上调用 prefix()
将返回一个名为 AsyncPrefixSequence
的专用类型,每次循环访问流时,该类型都会从流中读取 N 个值,而不是充当切片。
So, this code will print the same output as accessing stream.prefix(3)
directly each time:
因此,这段代码将打印与每次直接访问 stream.prefix(3)
相同的输出:
let stream = AsyncStream { continuation in
for i in 1...9 {
continuation.yield(i)
}
continuation.finish()
}
let firstThree = stream.prefix(3)
for _ in 1...3 {
print("The next three are:")
for await item in firstThree {
print(item)
}
}
The second power feature of AsyncStream
is how smoothly it lets us read values concurrently. So, we can read values from our stream in three separate tasks, and each will get some of the values from the stream:AsyncStream
的第二个强大功能是它允许我们并发读取值的流畅性。因此,我们可以在三个单独的任务中从流中读取值,每个任务都将从流中获取一些值:
let stream = AsyncStream { continuation in
for i in 1...9 {
continuation.yield(i)
}
continuation.finish()
}
Task {
for await item in stream {
print("1. \(item)")
}
}
Task {
for await item in stream {
print("2. \(item)")
}
}
Task {
for await item in stream {
print("3. \(item)")
}
}
try? await Task.sleep(for: .seconds(1))
When that runs, you’ll probably see each task print out three values, interleaved with output from other tasks.
当它运行时,您可能会看到每个任务打印出三个值,与其他任务的输出交错。
Important: Accessing an AsyncStream
concurrently means each task will receive only some of the values from the stream. If you need every task to receive every value, you should look at a Combine publisher instead.
重要: 并发访问 AsyncStream
意味着每个任务将仅接收来自流的部分值。如果您需要每个任务都接收每个值,则应查看 Combine 发布者。
Finally, the ability to add values into the stream means we can place values into our stream whenever we want, take a break for a while, add some more, and so on.
最后,向流中添加值的能力意味着我们可以随时将值放入流中,休息片刻,添加更多值,依此类推。
In our current code, all nine integers get placed into the stream before anything is read – we don’t know when or where they will be read, or even if they will be read at all. But we could equally well add numbers over time, like this:
在我们当前的代码中,所有 9 个整数在读取任何内容之前都会被放入流中 – 我们不知道何时何地会读取它们,甚至不知道它们是否会被读取。但是我们同样可以随着时间的推移添加数字,如下所示:
let stream = AsyncStream { continuation in
for i in 1...3 {
Task {
try await Task.sleep(for: .seconds(i))
continuation.yield(i)
if i == 3 {
continuation.finish()
}
}
}
}
for await item in stream {
print(item)
}
All three of those AsyncStream
features are useful, but it has one other power that takes it up into a league of its own: we can set a buffering policy that determines how values get added when existing values have yet to be read.
所有这三个 AsyncStream
功能都很有用,但它还有另一个功能可以将其提升到自己的联盟中:我们可以设置一个缓冲策略,以确定在尚未读取现有值时如何添加值。
There are three buffering policies to choose from:
有三种缓冲策略可供选择:
- The default buffering policy is
.unbounded
, which means you can add as many values to your stream as you want, and they’ll all wait to be read.
默认缓冲策略为.unbounded
,这意味着您可以根据需要向流中添加任意数量的值,并且它们都将等待读取。 - The
.bufferingNewest()
policy means the stream should retain only some number of values, discarding older values once a buffer size has been reached..bufferingNewest()
策略意味着流应仅保留一定数量的值,一旦达到缓冲区大小,就会丢弃较旧的值。 - The
.bufferingOldest()
policy means the stream will read values in until the buffer is full, and will ignore any further new values until the older values have been read..bufferingOldest()
策略意味着流将读取值,直到缓冲区已满,并将忽略任何其他新值,直到读取旧值。
For example, we could use .bufferingNewest(5)
to allow our stream to receive five values as normal, but if it receives a sixth one before the first has been read it will discard the first value.
例如,我们可以使用 .bufferingNewest(5)
来允许我们的流照常接收 5 个值,但是如果它在读取第一个值之前收到第六个值,它将丢弃第一个值。
You can see this in action here:
您可以在此处看到它的实际效果:
let stream = AsyncStream(bufferingPolicy: .bufferingNewest(5)) { continuation in
for i in 1...9 {
continuation.yield(i)
}
continuation.finish()
}
for await item in stream {
print(item)
}
It’s worth spending a moment to really think what’s happening there:
值得花点时间真正思考那里发生了什么:
- Swift adds 1, 2, 3, 4, and 5 to the stream as normal
Swift 会照常将 1、2、3、4 和 5 添加到流中 - When 6 comes in the buffer is full. So, it discards 1 (the oldest value) to make room, and adds 6 at the end.
当 6 进来时,缓冲区已满。因此,它丢弃 1(最早的值)以腾出空间,并在末尾添加 6。 - This then happens again for 7, this time discarding the next oldest value, which is 2.
然后,对于 7 再次发生这种情况,这次丢弃下一个最早的值,即 2。 - 8 then comes in, so 3 is discarded.
然后 8 进来,所以 3 被丢弃。 - Finally, 9 is added to the stream, so 4 is discarded.
最后,将 9 添加到流中,因此丢弃 4。 - All this happens before our
for await
loop runs, which means by the time it does run the stream contains 5, 6, 7, 8, 9, which gets printed.
所有这些都发生在我们的for await
循环运行之前,这意味着当它运行时,流包含 5、6、7、8、9,它们被打印出来。
If you use a buffer size of 1, it means one of two things:
如果使用缓冲区大小 1,则表示以下两种情况之一:
- Using
bufferingOldest(1)
means “don’t add any new values to the stream until the first one was read.”
使用bufferingOldest(1)
意味着“在读取第一个值之前,不要向流中添加任何新值”。 - Using
bufferingNewest(1)
means “always throw away the existing value when a new one comes in.”
使用bufferingNewest(1)
意味着“当有新值进来时,总是丢弃现有值”。
Using bufferingNewest(1)
allows your stream to store the latest value ready for reading as soon as anyone asks for it. They’ll also then get all future values for as long as they continue to read from the stream.
使用 bufferingNewest(1)
可以让你的流存储最新的值,以便在有人要求时立即读取。然后,只要它们继续从流中读取数据,它们还将获取所有 future 值。
If you use a buffer size of zero, the value must be read straight away otherwise it will be discarded. To be clear: if nothing is actively reading from the stream, any values added with a zero buffer will be discarded.
如果使用的缓冲区大小为零,则必须 立即读取该值,否则将被丢弃。需要明确的是:如果没有任何内容正在从流中主动读取,则使用零缓冲区添加的任何值都将被丢弃。
You can see the problem in the following code:
您可以在以下代码中看到问题:
let stream = AsyncStream(bufferingPolicy: .bufferingOldest(0)) { continuation in
continuation.yield("Hello, AsyncStream!")
continuation.finish()
}
for await item in stream {
print(item)
}
That posts creates a stream and posts a value before anything attempts to read from the stream, so it gets discarded immediately.
该 posts 会创建一个流,并在任何尝试从流中读取之前发布一个值,因此它会立即被丢弃。
In comparison, this next code only yields its value after one second has elapsed:
相比之下,下一个代码仅在一秒后产生其值:
let stream = AsyncStream(bufferingPolicy: .bufferingOldest(0)) { continuation in
Task {
try await Task.sleep(for: .seconds(1))
continuation.yield("Hello, AsyncStream!")
continuation.finish()
}
}
for await item in stream {
print(item)
}
So, by the time a value is yielded, our for await
loop is actively waiting for it.
因此,当一个值被产生时,我们的 for await
循环正在主动等待它。
Zero-length buffers are helpful for situations when a value makes sense only when consumed right now. For example, if you wanted to watch the filesystem for changes, you wouldn’t want to be notified of all changes that happened before you started watching, only those that happen after.
零长度缓冲区在值仅在当前使用时才有意义的情况下非常有用。例如,如果您想监视文件系统的更改,则不希望在开始监视之前收到发生的所有更改的通知,而只收到 之后 发生的更改的通知。
Finally, AsyncThrowingStream
is useful for places when the work you will do might finish early by throwing an error. This will end the stream immediately – all previous values will be sent back as normal, but no future values will.
最后,AsyncThrowingStream
对于您将要执行的工作可能会因引发错误而提前完成的地方非常有用。这将立即结束流 – 所有以前的值都将照常发送回去,但未来的值不会。
For example, we could make our number stream throw an error when it’s asked to return anything that’s a multiple of 3:
例如,当 number stream 被要求返回 3 的倍数时,我们可以让它抛出一个错误:
enum MultipleError: Error {
case no3
}
let stream = AsyncThrowingStream { continuation in
for i in 1...40 {
continuation.yield(i)
if i.isMultiple(of: 3) {
continuation.finish(throwing: MultipleError.no3)
}
}
continuation.finish()
}
As you can see, throwing an error is done by calling continuation.finish(throwing:)
– it’s really clear that this finishes the stream. In this code we could actually add a return
keyword after throwing the error, but in your own code you might want to add some clean up work.
如你所见,抛出错误是通过调用 continuation.finish(throwing:)
—— 很明显,这完成了流。在此代码中,我们实际上可以在引发错误后添加 return
关键字,但在您自己的代码中,您可能希望添加一些清理工作。
We can now loop over the values from that stream, handling errors somehow:
我们现在可以遍历该流中的值,以某种方式处理错误:
enum MultipleError: Error {
case no3
}
let stream = AsyncThrowingStream { continuation in
for i in 1...40 {
continuation.yield(i)
if i.isMultiple(of: 3) {
continuation.finish(throwing: MultipleError.no3)
}
}
continuation.finish()
}
do {
for try await values in stream {
print(values)
}
} catch {
print("Error received: \(error)")
}
When that code runs, it will print 1, 2, and 3, then the error is thrown. 4 and 5 won’t be printed, because the stream ends before they are yielded.
当该代码运行时,它将打印 1、2 和 3,然后引发错误。4 和 5 不会被打印,因为流在它们被生成之前结束。印,因为流在它们被生成之前结束。