Skip to content

Commit

Permalink
better defined behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
loreanvictor committed Jun 7, 2024
1 parent ede750d commit 08e67b9
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 154 deletions.
118 changes: 57 additions & 61 deletions src/computed.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import { Source } from './source'
import { Signal, SignalOptions } from './signal'
import { deferred, Deferred } from './util/deferred'
import { isStoppable } from './util/stoppable'
import { AbortableOptions, checkAbort } from './util/abortable'


export const SKIP = Symbol()
export const STOP = Symbol()

export interface ExprOptions extends AbortableOptions {
initial?: boolean
}

export type ExprOptions = AbortableOptions
export type ExprResultSync<T> = undefined | T | typeof SKIP | typeof STOP
export type ExprResult<T> = ExprResultSync<T> | Promise<ExprResultSync<T>>
export type ExprFn<T> = (track: Track, options: ExprOptions) => ExprResult<T>
Expand All @@ -33,15 +29,13 @@ export function wrap<T>(src: Trackable<T>): Source<T> | Computed<T> {

export class Computed<T> extends Signal<T | undefined> {
private _changedDependencies?: Source<unknown>[]
private _initialRun?: Deferred<void>
private _initialError?: unknown
private _activeRuns = 0
private _ranOnce = false
private _lastRunError?: any
private _lastRun?: Promise<void>
private _activeDependencies = new WeakMap<Source<unknown>, boolean>()

constructor(private expr: ExprFn<T>, options?: ComputedOptions<T>) {
super({ initial: undefined, ...options })

this.run({ initial: true })
}

private track<R>(src: Source<R>, tracked: Source<unknown>[], signal?: AbortSignal): R {
Expand All @@ -60,79 +54,81 @@ export class Computed<T> extends Signal<T | undefined> {
}

override async get(options?: AbortableOptions) {
if (this._changedDependencies) {
const promises: Promise<unknown>[] = []
for (const dep of this._changedDependencies) {
const promise = dep.get()
!dep.valid && promises.push(promise)
}
if (!this.valid) {
if (
this._lastRun &&
(!this._changedDependencies || this._changedDependencies.length === 0)
) {
await this._lastRun
} else {
const promises: Promise<unknown>[] = []
const deps = (this._changedDependencies ?? []).slice()
this._changedDependencies = undefined

this._changedDependencies = undefined
promises.length > 0 && await Promise.all(promises)
for (const dep of deps) {
const promise = dep.get()
!dep.valid && promises.push(promise)
}

const p = this.run(options)
p instanceof Promise && await p
} else if (!this.valid) {
await (this._initialRun ??= deferred()).promise
} else if (this._initialError) {
throw this._initialError
checkAbort(options?.signal)
promises.length > 0 && await Promise.all(promises)

try {
const p = this.run(options)
this._lastRunError = undefined
p instanceof Promise ?
await this.awaitRun(p) :
this._lastRun = undefined
} catch (error) {
this._lastRunError = error
throw error
}
}
}

return this._last
}

public async reevaluate(options?: AbortableOptions) {
const p = this.run(options)
p instanceof Promise && await p
p instanceof Promise ?
await this.awaitRun(p)
: this._lastRun = undefined

return this._last
}

override get valid() {
return this._activeRuns === 0 && (
return this._ranOnce && !this._lastRun && !this._lastRunError && (
!this._changedDependencies || this._changedDependencies.length === 0
)
}

protected override invalidate() { this._activeRuns++ }
protected override validate() { this._activeRuns-- }
protected async awaitRun(promise: Promise<void>) {
this._lastRun = promise
await promise
this._lastRun === promise && (this._lastRun = undefined)
}

protected run(options?: ExprOptions): void | Promise<void> {
checkAbort(options?.signal)

const tracked: Source<unknown>[] = []
try {
checkAbort(options?.signal)
const result = this.expr(
(<R>(t: Trackable<R>) => this.track(wrap(t), tracked, options?.signal)) as Track,
options ?? {}
)
tracked.length === 0 && this.stop()

if (result instanceof Promise) {
this.invalidate()

return result.finally(() => {
this.validate()
}).then((value) => {
checkAbort(options?.signal)
this.emit(value)
this._initialRun?.resolve()
}).catch(err => {
if (options?.initial) {
this._initialRun?.reject(err)
} else {
throw err
}
})
} else {
const result = this.expr(
(<R>(t: Trackable<R>) => this.track(wrap(t), tracked, options?.signal)) as Track,
options ?? {}
)
this._ranOnce = true
tracked.length === 0 && this.stop()

if (result instanceof Promise) {
return result.then((value) => {
checkAbort(options?.signal)
this.emit(result)
}
} catch (err) {
if (options?.initial) {
this._initialError = err
} else {
throw err
}
this.emit(value)
})
} else {
checkAbort(options?.signal)
this.emit(result)
}
}

Expand Down
68 changes: 36 additions & 32 deletions src/observe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,17 @@ export interface ObservationOptions {

export class Observation<T> extends Cleanable implements Source<T> {
private abort?: AbortController
private schedule: Scheduler
private subscriptions: Disposable
private _last?: Deferred<T>

constructor(readonly src: Source<T>, options?: ObservationOptions) {
super()

const schedule = options?.schedule ?? asap()
this.subscriptions = src.listen(
() => schedule(
() => {
if (!this._last?.closed) {
this._last = deferred<T>()
}
this.schedule = options?.schedule ?? asap()
this.subscriptions = src.listen(() => this.fetch())

this.abort?.abort()
const ctrl = new AbortController()
this.abort = ctrl

const promise = src.get({ signal: ctrl.signal })

if (!src.valid) {
promise.then(val => {
if (!ctrl.signal.aborted) {
this.abort === ctrl && (this.abort = undefined)
this._last!.resolve(val)
}
}).catch(reason => {
if (!ctrl.signal.aborted) {
this.abort === ctrl && (this.abort = undefined)
this._last!.reject(reason)
}
})
} else {
this.abort === ctrl && (this.abort = undefined)
this._last.resolve(src.last)
}
}
)
)
!src.valid && this.fetch()
}

get(options?: AbortableOptions) {
Expand Down Expand Up @@ -81,6 +53,38 @@ export class Observation<T> extends Cleanable implements Source<T> {
get last() {
return this.src.last
}

protected fetch() {
this.schedule(
() => {
if (!this._last || this._last.closed) {
this._last = deferred<T>()
}

this.abort?.abort()
const ctrl = new AbortController()
this.abort = ctrl

const promise = this.src.get({ signal: ctrl.signal }).catch(reason => {
if (!ctrl.signal.aborted) {
this._last!.reject(reason)
}
})

if (!this.src.valid) {
promise.then(val => {
if (!ctrl.signal.aborted) {
this._last!.resolve(val!)
}
})
} else {
if (!ctrl.signal.aborted) {
this._last.resolve(this.src.last)
}
}
}
)
}
}


Expand Down
Loading

0 comments on commit 08e67b9

Please sign in to comment.