Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored and gcanti committed May 31, 2024
1 parent 127b5a0 commit 2e33195
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 26 deletions.
172 changes: 151 additions & 21 deletions packages/effect/src/Micro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,41 @@ export const raceFirst: {
raceAllFirst([self, that])
)

// ----------------------------------------------------------------------------
// zipping
// ----------------------------------------------------------------------------

/**
* @since 3.3.0
* @category zipping
*/
export const zip: {
<A2, E2, R2>(
that: Micro<A2, E2, R2>,
options?:
| { readonly concurrent?: boolean | undefined }
| undefined
): <A, E, R>(self: Micro<A, E, R>) => Micro<[A, A2], E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Micro<A, E, R>,
that: Micro<A2, E2, R2>,
options?:
| { readonly concurrent?: boolean | undefined }
| undefined
): Micro<[A, A2], E | E2, R | R2>
} = dual((args) => isMicro(args[1]), <A, E, R, A2, E2, R2>(
self: Micro<A, E, R>,
that: Micro<A2, E2, R2>,
options?:
| { readonly concurrent?: boolean | undefined }
| undefined
): Micro<[A, A2], E | E2, R | R2> => {
if (options?.concurrent) {
return all([self, that], { concurrency: "unbounded" })
}
return flatMap(self, (a) => map(that, (a2) => [a, a2]))
})

// ----------------------------------------------------------------------------
// filtering & conditionals
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -970,6 +1005,36 @@ export const filterOrFail: {
orFailWith: (a: A) => E2
): Micro<B, E | E2, R> => flatMap(self, (a) => refinement(a) ? succeed(a as any) : fail(orFailWith(a))))

/**
* The moral equivalent of `if (p) exp`.
*
* @since 3.3.0
* @category filtering & conditionals
*/
export const when: {
(condition: LazyArg<boolean>): <A, E, R>(self: Micro<A, E, R>) => Micro<Option.Option<A>, E, R>
<A, E, R>(self: Micro<A, E, R>, condition: LazyArg<boolean>): Micro<Option.Option<A>, E, R>
} = dual(
2,
<A, E, R>(self: Micro<A, E, R>, condition: LazyArg<boolean>): Micro<Option.Option<A>, E, R> =>
suspend(() => condition() ? asSome(self) : succeed(Option.none()))
)

/**
* @since 3.3.0
* @category filtering & conditionals
*/
export const whenMicro: {
<E, R>(
condition: Micro<boolean, E, R>
): <A, E2, R2>(effect: Micro<A, E2, R2>) => Micro<Option.Option<A>, E | E2, R | R2>
<A, E2, R2, E, R>(self: Micro<A, E2, R2>, condition: Micro<boolean, E, R>): Micro<Option.Option<A>, E2 | E, R2 | R>
} = dual(
2,
<A, E2, R2, E, R>(self: Micro<A, E2, R2>, condition: Micro<boolean, E, R>): Micro<Option.Option<A>, E2 | E, R2 | R> =>
flatMap(condition, (pass) => pass ? asSome(self) : succeed(Option.none()))
)

// ----------------------------------------------------------------------------
// repetition
// ----------------------------------------------------------------------------
Expand All @@ -982,29 +1047,38 @@ export const repeatResult: {
<A, E>(options: {
while: Predicate<Result<A, E>>
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): <R>(self: Micro<A, E, R>) => Micro<A, E, R>
<A, E, R>(self: Micro<A, E, R>, options: {
while: Predicate<Result<A, E>>
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): Micro<A, E, R>
} = dual(2, <A, E, R>(self: Micro<A, E, R>, options: {
while: Predicate<Result<A, E>>
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): Micro<A, E, R> =>
make(function(env, onResult) {
const startedAt = options.delay ? Date.now() : 0
let attempt = 0
const delay = options.delay ? ((attempt: number) => sleep(options.delay!(attempt))) : (_: number) => yieldNow
self[runSymbol](env, function loop(result) {
if (options.while !== undefined && !options.while(result)) {
return onResult(result)
} else if (options.times !== undefined && attempt >= options.times) {
return onResult(result)
}
attempt++
delay(attempt)[runSymbol](env, function(result) {
let delayEffect = yieldNow
if (options.delay !== undefined) {
const elapsed = Duration.millis(Date.now() - startedAt)
const duration = options.delay(attempt, elapsed)
if (Option.isNone(duration)) {
return onResult(result)
}
delayEffect = sleep(duration.value)
}
delayEffect[runSymbol](env, function(result) {
if (result._tag === "Left") {
return onResult(result as any)
}
Expand All @@ -1021,17 +1095,17 @@ export const repeat: {
<A, E>(options: {
while?: Predicate<A> | undefined
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): <R>(self: Micro<A, E, R>) => Micro<A, E, R>
<A, E, R>(self: Micro<A, E, R>, options: {
while?: Predicate<A> | undefined
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): Micro<A, E, R>
} = dual(2, <A, E, R>(self: Micro<A, E, R>, options: {
while?: Predicate<A> | undefined
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): Micro<A, E, R> =>
repeatResult(self, {
...options,
Expand All @@ -1044,6 +1118,57 @@ export const repeat: {
*/
export const forever = <A, E, R>(self: Micro<A, E, R>): Micro<never, E, R> => repeat(self, {}) as any

// ----------------------------------------------------------------------------
// delays
// ----------------------------------------------------------------------------

/**
* @since 3.3.0
* @category delays
*/
export type DelayFn = (attempt: number, elapsed: Duration.Duration) => Option.Option<Duration.DurationInput>

/**
* @since 3.3.0
* @category delays
*/
export const delayExponential = (base: Duration.DurationInput, factor = 2): DelayFn => {
const baseMillis = Duration.toMillis(base)
return (attempt) => Option.some(attempt ** factor * baseMillis)
}

/**
* @since 3.3.0
* @category delays
*/
export const delaySpaced = (duration: Duration.DurationInput): DelayFn => (_) => Option.some(duration)

/**
* @since 3.3.0
* @category delays
*/
export const delayWithMax: {
(max: Duration.DurationInput): (self: DelayFn) => DelayFn
(self: DelayFn, max: Duration.DurationInput): DelayFn
} = dual(
2,
(self: DelayFn, max: Duration.DurationInput): DelayFn => (attempt, elapsed) =>
Option.map(self(attempt, elapsed), Duration.max(max))
)

/**
* @since 3.3.0
* @category delays
*/
export const delayWithMaxElapsed: {
(max: Duration.DurationInput): (self: DelayFn) => DelayFn
(self: DelayFn, max: Duration.DurationInput): DelayFn
} = dual(
2,
(self: DelayFn, max: Duration.DurationInput): DelayFn => (attempt, elapsed) =>
Duration.lessThan(elapsed, max) ? self(attempt, elapsed) : Option.none()
)

// ----------------------------------------------------------------------------
// error handling
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -1245,17 +1370,17 @@ export const retry: {
<A, E>(options: {
while?: Predicate<E> | undefined
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): <R>(self: Micro<A, E, R>) => Micro<A, E, R>
<A, E, R>(self: Micro<A, E, R>, options: {
while?: Predicate<E> | undefined
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): Micro<A, E, R>
} = dual(2, <A, E, R>(self: Micro<A, E, R>, options: {
while?: Predicate<E> | undefined
times?: number | undefined
delay?: ((attempt: number) => Duration.DurationInput) | undefined
delay?: DelayFn | undefined
}): Micro<A, E, R> =>
repeatResult(self, {
...options,
Expand Down Expand Up @@ -1623,7 +1748,12 @@ export const onResult: {
uninterruptibleMask((restore) =>
make(function(env, onResult) {
restore(self)[runSymbol](env, function(result) {
f(result)[runSymbol](env, (_) => onResult(result))
f(result)[runSymbol](env, function(finalizerResult) {
if (finalizerResult._tag === "Left") {
return onResult(finalizerResult as any)
}
onResult(result)
})
})
})
)
Expand Down Expand Up @@ -1937,11 +2067,11 @@ export const all = <
* @category collecting & elements
*/
export const forEach: {
<A, B, E, R>(iterable: Iterable<A>, f: (a: NoInfer<A>) => Micro<B, E, R>, options?: {
<A, B, E, R>(iterable: Iterable<A>, f: (a: NoInfer<A>, index: number) => Micro<B, E, R>, options?: {
readonly concurrency?: Concurrency | undefined
readonly discard?: false | undefined
}): Micro<Array<B>, E, R>
<A, B, E, R>(iterable: Iterable<A>, f: (a: NoInfer<A>) => Micro<B, E, R>, options: {
<A, B, E, R>(iterable: Iterable<A>, f: (a: NoInfer<A>, index: number) => Micro<B, E, R>, options: {
readonly concurrency?: Concurrency | undefined
readonly discard: true
}): Micro<void, E, R>
Expand All @@ -1950,7 +2080,7 @@ export const forEach: {
B,
E,
R
>(iterable: Iterable<A>, f: (a: NoInfer<A>) => Micro<B, E, R>, options?: {
>(iterable: Iterable<A>, f: (a: NoInfer<A>, index: number) => Micro<B, E, R>, options?: {
readonly concurrency?: Concurrency | undefined
readonly discard?: boolean | undefined
}): Micro<any, E, R> =>
Expand All @@ -1976,7 +2106,7 @@ const forEachSequential = <
B,
E,
R
>(iterable: Iterable<A>, f: (a: NoInfer<A>) => Micro<B, E, R>, options?: {
>(iterable: Iterable<A>, f: (a: NoInfer<A>, index: number) => Micro<B, E, R>, options?: {
readonly discard?: boolean | undefined
}): Micro<any, E, R> =>
make(function(env, onResult) {
Expand All @@ -1991,7 +2121,7 @@ const forEachSequential = <
let complete = false
const current = index++
try {
f(items[current])[runSymbol](env, function(result) {
f(items[current], current)[runSymbol](env, function(result) {
complete = true
if (result._tag === "Left") {
index = length
Expand Down Expand Up @@ -2023,7 +2153,7 @@ const forEachConcurrent = <
B,
E,
R
>(iterable: Iterable<A>, f: (a: NoInfer<A>) => Micro<B, E, R>, options: {
>(iterable: Iterable<A>, f: (a: NoInfer<A>, index: number) => Micro<B, E, R>, options: {
readonly concurrency: number | "unbounded"
readonly discard?: boolean | undefined
}): Micro<any, E, R> =>
Expand Down Expand Up @@ -2053,9 +2183,7 @@ const forEachConcurrent = <
index++
inProgress++
try {
f(item)[runSymbol](envWithSignal, function(result) {
doneCount++
inProgress--
f(item, currentIndex)[runSymbol](envWithSignal, function(result) {
if (result._tag === "Left") {
if (failure === undefined) {
failure = result
Expand All @@ -2064,6 +2192,8 @@ const forEachConcurrent = <
} else if (out !== undefined) {
out[currentIndex] = result.right
}
doneCount++
inProgress--
if (doneCount === length) {
onAbort()
onResult(failure ?? Either.right(out))
Expand Down
10 changes: 5 additions & 5 deletions packages/effect/test/Micro.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { assert, describe, it } from "effect/test/utils/extend"

class ATag extends Context.Tag("ATag")<ATag, "A">() {}

describe("Micro", () => {
describe.sequential("Micro", () => {
it("runPromise", async () => {
const result = await Micro.runPromise(Micro.succeed(1))
assert.strictEqual(result, 1)
Expand Down Expand Up @@ -271,7 +271,7 @@ describe("Micro", () => {
}).pipe(Micro.runPromise))
})

it("raceAll", () =>
it.effect("raceAll", () =>
Micro.gen(function*() {
const interrupted: Array<number> = []
const result = yield* Micro.raceAll([100, 75, 50, 0, 25].map((ms) =>
Expand All @@ -286,7 +286,7 @@ describe("Micro", () => {
))
assert.strictEqual(result, 25)
assert.deepStrictEqual(interrupted, [100, 75, 50])
}).pipe(Micro.runPromise))
}))

it("raceAllFirst", () =>
Micro.gen(function*() {
Expand Down Expand Up @@ -341,12 +341,12 @@ describe("Micro", () => {
})

describe("repeat", () => {
it.live("is stack safe", () =>
it.effect("is stack safe", () =>
Micro.void.pipe(
Micro.repeat({ times: 10000 })
))

it.live("is interruptible", () =>
it.effect("is interruptible", () =>
Micro.void.pipe(
Micro.forever,
Micro.timeout(50)
Expand Down

0 comments on commit 2e33195

Please sign in to comment.