From 162b156a69d993ca5f73b89b51e813f6bf0340c8 Mon Sep 17 00:00:00 2001 From: Tylor Steinberger Date: Sun, 18 Feb 2024 19:20:32 -0500 Subject: [PATCH] feat(RefSubject): get/unsafeGet(Exit) --- .changeset/thirty-ants-complain.md | 5 ++ packages/fx/src/RefSubject.ts | 132 +++++++++++++++++++++++++++-- packages/fx/src/Subject.ts | 14 ++- packages/fx/src/Versioned.ts | 24 ++++-- packages/fx/test/core.ts | 26 ++++++ 5 files changed, 183 insertions(+), 18 deletions(-) create mode 100644 .changeset/thirty-ants-complain.md diff --git a/.changeset/thirty-ants-complain.md b/.changeset/thirty-ants-complain.md new file mode 100644 index 000000000..e440b2e35 --- /dev/null +++ b/.changeset/thirty-ants-complain.md @@ -0,0 +1,5 @@ +--- +"@typed/fx": minor +--- + +RefSubject get/unsafeGet(Exit) diff --git a/packages/fx/src/RefSubject.ts b/packages/fx/src/RefSubject.ts index c713add64..e53f4bfe0 100644 --- a/packages/fx/src/RefSubject.ts +++ b/packages/fx/src/RefSubject.ts @@ -5,7 +5,7 @@ import * as C from "@typed/context" import type { Equivalence, FiberId, Runtime } from "effect" -import { Fiber } from "effect" +import { Fiber, MutableRef } from "effect" import * as Boolean from "effect/Boolean" import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" @@ -18,7 +18,7 @@ import { sum } from "effect/Number" import * as Option from "effect/Option" import * as ReadonlyArray from "effect/ReadonlyArray" import * as Scope from "effect/Scope" -import type { Fx } from "./Fx.js" +import { type Fx } from "./Fx.js" import * as core from "./internal/core.js" import * as DeferredRef from "./internal/DeferredRef.js" import { getExitEquivalence, matchEffectPrimitive, withScope } from "./internal/helpers.js" @@ -41,6 +41,11 @@ export interface Computed extends Versioned.Versioned { readonly [ComputedTypeId]: ComputedTypeId + + /** + * @since 1.25.0 + */ + readonly unsafeGet: () => Exit.Exit } /** @@ -70,6 +75,11 @@ export interface Filtered * @since 1.20.0 */ asComputed(): Computed, E, R> + + /** + * @since 1.25.0 + */ + readonly unsafeGet: () => Exit.Exit } /** @@ -101,6 +111,11 @@ export interface RefSubject readonly runUpdates: ( f: (ref: GetSetDelete) => Effect.Effect ) => Effect.Effect + + /** + * @since 1.25.0 + */ + readonly unsafeGet: () => Exit.Exit } /** @@ -356,10 +371,10 @@ class RefSubjectImpl extends FxEffectBase | Sc readonly interrupt: Effect.Effect> readonly subscriberCount: Effect.Effect> - private readonly getSetDelete: GetSetDelete> + readonly getSetDelete: GetSetDelete> constructor( - private readonly core: RefSubjectCore + readonly core: RefSubjectCore ) { super() @@ -386,6 +401,9 @@ class RefSubjectImpl extends FxEffectBase | Sc return this.core.semaphore.withPermits(1)(run(this.getSetDelete)) } + unsafeGet: () => Exit.Exit = () => + Option.getOrThrowWith(this.core.deferredRef.current, () => new Cause.NoSuchElementException()) + onSuccess(value: A): Effect.Effect> { return setCore(this.core, value) } @@ -1035,6 +1053,9 @@ class ComputedImpl extends Versioned.Version ) } + unsafeGet: () => Exit.Exit = () => + Option.getOrThrowWith(this._currentValue, () => new Cause.NoSuchElementException()) + static make( input: Versioned.Versioned, f: (a: A) => Effect.Effect @@ -1086,6 +1107,9 @@ class FilteredImpl extends Versioned.Version asComputed(): Computed, E0 | E | E2 | E3, R0 | R2 | R3 | Exclude> { return ComputedImpl.make(this.input, this.f) } + + unsafeGet: () => Exit.Exit = () => + Option.getOrThrowWith(this._currentValue, () => new Cause.NoSuchElementException()) } /** @@ -1204,6 +1228,11 @@ class RefSubjectTransform extends FxEffectBase Exit.Exit = () => { + const exit = this.ref.unsafeGet() + return Exit.map(exit, this.from) + } + onFailure(cause: Cause.Cause): Effect.Effect { return this.ref.onFailure(cause) } @@ -1227,7 +1256,7 @@ class RefSubjectTransformEffect readonly version: Effect.Effect readonly interrupt: Effect.Effect readonly subscriberCount: Effect.Effect - readonly subject: Subject.Subject + readonly subject: Subject.HoldSubjectImpl constructor( readonly ref: RefSubject, @@ -1239,11 +1268,15 @@ class RefSubjectTransformEffect this.version = ref.version this.interrupt = ref.interrupt this.subscriberCount = ref.subscriberCount - this.subject = Subject.unsafeMake() + this.subject = new Subject.HoldSubjectImpl() } run(sink: Sink.Sink): Effect.Effect { - return core.merge(core.mapEffect(this.ref, this.from), this.subject).run(sink) + return core.skipRepeats( + core.merge(core.tapEffect(core.mapEffect(this.ref, this.from), this.subject.onSuccess), this.subject) + ).run( + sink + ) } runUpdates( @@ -1268,6 +1301,10 @@ class RefSubjectTransformEffect ) } + unsafeGet: () => Exit.Exit = () => { + return Option.getOrThrowWith(MutableRef.get(this.subject.lastValue), () => new Cause.NoSuchElementException()) + } + onFailure(cause: Cause.Cause): Effect.Effect { return this.subject.onFailure(cause) } @@ -1494,6 +1531,21 @@ class RefSubjectTuple< return run(this.getSetDelete) } + unsafeGet: () => Exit.Exit< + { readonly [K in keyof Refs]: Effect.Effect.Success }, + Effect.Effect.Error + > = () => { + return Option.getOrThrowWith( + Exit.all(this.refs.map((r) => r.unsafeGet())) as Option.Option< + Exit.Exit< + { readonly [K in keyof Refs]: Effect.Effect.Success }, + Effect.Effect.Error + > + >, + () => new Cause.NoSuchElementException() + ) + } + onFailure( cause: Cause.Cause> ): Effect.Effect> { @@ -1648,6 +1700,24 @@ class RefSubjectStruct< return run(this.getSetDelete) } + unsafeGet: () => Exit.Exit< + { readonly [K in keyof Refs]: Effect.Effect.Success }, + Effect.Effect.Error + > = () => { + const entries = Object.entries(this.refs).map(([k, r]) => Exit.map(r.unsafeGet(), (a) => [k, a] as const)) + const exit = Option.getOrThrowWith( + Exit.all(entries) as Option.Option< + Exit.Exit< + ReadonlyArray]>, + Effect.Effect.Error + > + >, + () => new Cause.NoSuchElementException() + ) + + return Exit.map(exit, (entries) => Object.fromEntries(entries)) as any + } + onFailure( cause: Cause.Cause> ): Effect.Effect> { @@ -1785,6 +1855,10 @@ class RefSubjectTagged extends FxEffectBase< return this.tag.withEffect((ref) => ref.runUpdates(run)) } + unsafeGet: () => Exit.Exit = () => { + throw new Error(`Unable to unsafely get a tagged RefSubject because it requires the Effect context by defnition.`) + } + onFailure(cause: Cause.Cause): Effect.Effect { return this.tag.withEffect((ref) => ref.onFailure(cause)) } @@ -1855,6 +1929,10 @@ class RefSubjectFromTag extends FxEffectBase< return Effect.flatMap(this._get, (ref) => ref.runUpdates(run)) } + unsafeGet: () => Exit.Exit = () => { + throw new Error(`Unable to unsafely get a tagged RefSubject because it requires the Effect context by defnition.`) + } + onFailure(cause: Cause.Cause): Effect.Effect { return Effect.flatMap(this._get, (ref) => ref.onFailure(cause)) } @@ -1949,6 +2027,10 @@ class ComputedFromTag extends FxEffectBase< toEffect(): Effect.Effect { return Effect.flatten(this._get) } + + unsafeGet: () => Exit.Exit = () => { + throw new Error(`Unable to unsafely get a tagged Computed because it requires the Effect context by defnition.`) + } } /** @@ -1996,6 +2078,10 @@ class FilteredFromTag extends FxEffectBase< asComputed(): Computed, E, R | I> { return new ComputedFromTag(this.tag, (s) => this.f(s).asComputed()) } + + unsafeGet: () => Exit.Exit = () => { + throw new Error(`Unable to unsafely get a tagged Filtered because it requires the Effect context by defnition.`) + } } /** @@ -2198,6 +2284,8 @@ class RefSubjectSlice extends FxEffectBase Exit.Exit = () => this.ref.unsafeGet() + onFailure(cause: Cause.Cause): Effect.Effect { return this.ref.onFailure(cause) } @@ -2206,3 +2294,33 @@ class RefSubjectSlice extends FxEffectBase(ref: RefSubject): Effect.Effect + (ref: Computed): Effect.Effect + (ref: Filtered): Effect.Effect +} = ( + ref: RefSubject | Computed | Filtered +): Effect.Effect => ref + +/** + * Synchronously get the current Exit value of the RefSubject. If it has not been set yet, a Cause.NoSuchElementException will be thrown. + * + * Note: This is unimplemented for RefSubject.tagged and RefSubject.fromTag because they require the Effect context by definition. + * It will throw immediately. + * + * @since 1.25.0 + */ +export const unsafeGetExit = (ref: RefSubject): Exit.Exit => ref.unsafeGet() + +/** + * Synchronously get the current value of the RefSubject. + * + * @since 1.25.0 + */ +export const unsafeGet = (ref: RefSubject): A => Effect.runSync(unsafeGetExit(ref)) diff --git a/packages/fx/src/Subject.ts b/packages/fx/src/Subject.ts index b506afd28..4bec2e122 100644 --- a/packages/fx/src/Subject.ts +++ b/packages/fx/src/Subject.ts @@ -159,21 +159,29 @@ export class SubjectImpl extends FxBase implements Subj * @internal */ export class HoldSubjectImpl extends SubjectImpl implements Subject { - private lastValue: MutableRef.MutableRef> = MutableRef.make(Option.none()) + readonly lastValue: MutableRef.MutableRef>> = MutableRef.make(Option.none()) // Emit an event to all sinks onSuccess = (a: A) => Effect.suspend(() => { - MutableRef.set(this.lastValue, Option.some(a)) + MutableRef.set(this.lastValue, Option.some(Exit.succeed(a))) return this.onEvent(a) }) + onFailure = (cause: Cause.Cause): Effect.Effect => { + return Effect.suspend(() => { + MutableRef.set(this.lastValue, Option.some(Exit.failCause(cause))) + + return this.onCause(cause) + }) + } + run(sink: Sink): Effect.Effect { return this.addSink(sink, (scope) => Option.match(MutableRef.get(this.lastValue), { onNone: () => awaitScopeClose(scope), - onSome: (a) => Effect.zipRight(sink.onSuccess(a), awaitScopeClose(scope)) + onSome: (exit) => Effect.zipRight(Exit.match(exit, sink), awaitScopeClose(scope)) })) } diff --git a/packages/fx/src/Versioned.ts b/packages/fx/src/Versioned.ts index 20610f8e7..671cf954c 100644 --- a/packages/fx/src/Versioned.ts +++ b/packages/fx/src/Versioned.ts @@ -7,6 +7,7 @@ import type * as Context from "@typed/context" import type { Layer, Runtime, Scope } from "effect" +import { Exit } from "effect" import * as Effect from "effect/Effect" import { dual, flow } from "effect/Function" import { sum } from "effect/Number" @@ -112,9 +113,9 @@ export class VersionedTransform implements Versioned { - protected _version = -1 - protected _currentValue: Option.Option = Option.none() - protected _fx: Fx + public _version = -1 + public _currentValue: Option.Option> = Option.none() + public _fx: Fx constructor( readonly input: Versioned, @@ -135,18 +136,25 @@ export class VersionedTransform { const transformed = this._transformEffect(this.input as any as Effect.Effect) const update = (v: number) => - Effect.tap( - transformed, - (value) => + Effect.tapErrorCause( + Effect.tap( + transformed, + (value) => + Effect.sync(() => { + this._currentValue = Option.some(Exit.succeed(value)) + this._version = v + }) + ), + (cause) => Effect.sync(() => { - this._currentValue = Option.some(value) + this._currentValue = Option.some(Exit.failCause(cause)) this._version = v }) ) return new MulticastEffect(Effect.flatMap(this.input.version, (version) => { if (version === this._version && Option.isSome(this._currentValue)) { - return Effect.succeed(this._currentValue.value) + return this._currentValue.value } return update(version) diff --git a/packages/fx/test/core.ts b/packages/fx/test/core.ts index c7cfa9e2f..00aecf622 100644 --- a/packages/fx/test/core.ts +++ b/packages/fx/test/core.ts @@ -400,6 +400,32 @@ describe.concurrent(__filename, () => { await Effect.runPromise(Effect.scoped(test)) }) }) + + describe.concurrent("unsafe", () => { + it.concurrent("unsafeGetExit", async () => { + const test = Effect.gen(function*(_) { + // of() actually has a starting value by default + const ref = yield* _(RefSubject.of(0)) + + const exit = RefSubject.unsafeGetExit(ref) + expect(exit).toEqual(Exit.succeed(0)) + }).pipe(Effect.scoped) + + await Effect.runPromise(test) + }) + + it.concurrent("unsafeGet", async () => { + const test = Effect.gen(function*(_) { + const ref = yield* _(RefSubject.make(Effect.succeed(0))) + // Effect/Fx-backed RefSubjects require being initialized + yield* _(ref) + + expect(RefSubject.unsafeGet(ref)).toEqual(0) + }).pipe(Effect.scoped) + + await Effect.runPromise(test) + }) + }) }) describe.concurrent("Subject", () => {