Down the vein of GitHub - tc39/proposal-unordered-async-iterator-helpers: JavaScript proposal for unordered async iterator helpers, I feel an unordered version of async generators could be really helpful, and also (mostly) solve the streaming problem.
Maybe something like this:
async unordered function *foo() { ... }
: Declare an unordered async generator.- The return value extends
UnorderedAsyncIterator
from GitHub - tc39/proposal-unordered-async-iterator-helpers: JavaScript proposal for unordered async iterator helpers yield
s resolve the oldest pending request first and resume with its input value, regardless of order. If no listeners are available, they suspend, with the oldest pendingyield
resuming first. (It's essentially FIFO both ways.)yield* source
is roughly equivalent tofor await next (const value of source) yield value
.- This suspension indirectly forces backpressure, to prevent overloading.
- The return value extends
for await [M; N] (value of source) body
: Read an async generator concurrently.- The
[M; N]
means to maintain up toN + 1
concurrent instances ofbody
and up toM + 1
concurrent.next()
calls at any given point. It can be considered optional with[0; 0]
as the default, since[0; 0]
provides the current semantics of it. Omit N to make it infinite, M to make it 1. (Both are normally processed like array/string indices, and N can also be infinite.)[;]
represents an infinite loop with a fetch parallelism of 1. - If the body contains an inner
yield
oryield*
, this functions a bit differently. Instead of automatically scaling all the way to max, each.next()
invocation picks a suspended loop with concurrency room (least recently cycled first) and fetches a new value there. This avoids infinite scaling loops and keeps it lazy. - The loop ends when all values have been received and all
body
instances complete. - If the loop throws or is broken from, the source stops being polled, all in-progress body invocations are awaited, and all pending body invocations and in-progress fetches are dropped.
- If one iteration throws, an
AggregateError
is thrown with all pending errors collected. - The source can be an ordered or unordered async iterator, or even a sync iterator.
- Ordered async generators order their yields likewise, but buffer them always. It's highly recommended to always specify N or
next
if you want to allow parallelism. - The N and M can easily be extended to support generic governors: GitHub - tc39/proposal-concurrency-control: interfaces and data structures for concurrency control and integration into async iterator helpers
- The
for await next ...
: Fork on.next()
instead of by concurrency. Only valid in async generators.- Avoids most async iterator helper needs.
await* { ... }
: Spawn concurrentasync { ... }
tasks you can concurrentlyyield
inside of, and await their completion.- If one throws, an
AggregateError
is thrown with all errors collected. - Break or throw from the outer
await all
expression to force proceeding without waiting for other tasks to resolve. - This is a statement. I'm specifically not proposing this to be a
do
expression. - Technically, you could use
for await [;] (const i of [1, ..., N]) switch (i) { ... }
to do a rough equivalent, but this is way clearer and more performant (as engines can just shove it all into generator state). - Ordered async generators order their yields likewise, but buffer them always. It's highly recommended to always specify N or
next
if you want to allow parallelism. - Alternatively, instead of this, you could allow unordered
yield
ing in GitHub - tc39/proposal-async-do-expressions: async `do` expressions for JavaScript and combine them with GitHub - tc39/proposal-await.ops: Introduce await.all / await.race / await.allSettled / await.any to simplify the usage of Promises to give both semantics beyond mere syntax sugar. But this IMHO is better since you can break and throw your way out directly.
- If one throws, an
As for how this could play out in practice, here's some (approximate) polyfills:
Promise.all = async promises => {
const values = []
let index = 0
for await [;] (const p of promises) {
const i = index++
values[i] = await p
}
return values
}
Promise.allSettled = async promises => {
const values = []
let index = 0
for await [;] (const p of promises) {
const i = index++
try {
values[i] = {status: "fulfilled", value: await p}
} catch (e) {
values[i] = {status: "rejected", reason: e}
}
}
return values
}
Promise.race = async promises => {
for await [;] (const p of promises) {
return await p
}
return undefined
}
UnorderedAsyncIterator.prototype.filter = async unordered function *(f) {
for await next (const item of this) {
if (await f(item)) yield item
}
}
UnorderedAsyncIterator.prototype.buffered = async unordered function *(limit) {
for await [; limit] (const item of this) {
yield item
}
}
AsyncIterator.prototype.unordered = async unordered function *(limit) {
for await [limit;] (const item of this) {
yield item
}
}
In a sense, I'm rebooting GitHub - dead-claudia/non-linear-proposal: (ABANDONED) A powerful non-linear control flow proposal unifying both Promise and Observable operations for highly expressive control flow and parallelism, inspired by non-Von Neumann models, but with a much smaller syntax and somewhat simpler semantics.