Observables

I wasn't able to find discussion of Observable proposal here, so as mentioned by @ljharb, posting it.

Minimal Observable interface:

type Callback<T extends Array<any> = [], U = void> = (...args: T) => U

class Observable<T> {
  constructor(
    setup: Callback<[
      Callback<[T]>,   // Value
      Callback<[any]>, // Error
      Callback,        // Complete
    ], void | Callback<[], any>>
  )

  subscribe(
    value: Callback<[T], any>,
    error?: Callback<[unknown], any>,
    complete?: Callback<[], any>
  ): Subscription
}

class Subscription {
  unsubscribe(): void
}

Usage

const values = new Observable((next, _error, complete) => {
  next('foo')
  setTimeout(complete, 1000)
})

const subscription = values.subscribe(
  console.log, // "foo"
  console.error,
  console.warn, // Will run with empty message
)

That primitive seems to be minimal layer needed for interop between existing libs. Too many reactive implementations exist nowadays, and there's friction switching between them due to lack of standard.

Proposal: Retain core API and leave operators to user-land libraries ยท Issue #210 ยท tc39/proposal-observable ยท GitHub

What work would it require to move forward with the proposal?

1 Like

If you want to propose an inter-op layer, it should be symbol-based. That way libraries can innovate their .subscribe() method signature without forfeiting interoperability.

Absolutely, for interop it defines Symbol.observable:

class Observable<T> {
    ...
    // Returns itself
    [Symbol.observable]() : Observable;
    ...
}
1 Like

The proposal in question has had the symbol the whole time for that exact reason among others. :wink:

This is great because it allows user-land libraries to expand observables, eg:

This (perhaps poor) example shows how you would implement a Subject through extending Observable

You could implement the RXJS idea of "pipes" through a wrapper function

const source = new Observable(o => setInterval(() => o.next('10'), 1000))

const pipedSource = pipe(source, [
  map(value => parseInt(value, 10))
])

pipedSource.subscribe(console.log)

example implementation