Skip to content

Commit

Permalink
feat(RefSubject): get/unsafeGet(Exit)
Browse files Browse the repository at this point in the history
  • Loading branch information
TylorS committed Feb 19, 2024
1 parent 416f37f commit 162b156
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/thirty-ants-complain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@typed/fx": minor
---

RefSubject get/unsafeGet(Exit)
132 changes: 125 additions & 7 deletions packages/fx/src/RefSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import * as C from "@typed/context"
import type { Equivalence, FiberId, Runtime } from "effect"
import { Fiber } from "effect"
import { Fiber, MutableRef } from "effect"
import * as Boolean from "effect/Boolean"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
Expand All @@ -18,7 +18,7 @@ import { sum } from "effect/Number"
import * as Option from "effect/Option"
import * as ReadonlyArray from "effect/ReadonlyArray"
import * as Scope from "effect/Scope"
import type { Fx } from "./Fx.js"
import { type Fx } from "./Fx.js"
import * as core from "./internal/core.js"
import * as DeferredRef from "./internal/DeferredRef.js"
import { getExitEquivalence, matchEffectPrimitive, withScope } from "./internal/helpers.js"
Expand All @@ -41,6 +41,11 @@ export interface Computed<out A, out E = never, out R = never>
extends Versioned.Versioned<R, E, A, E, R | Scope.Scope, A, E, R>
{
readonly [ComputedTypeId]: ComputedTypeId

/**
* @since 1.25.0
*/
readonly unsafeGet: () => Exit.Exit<A, E>
}

/**
Expand Down Expand Up @@ -70,6 +75,11 @@ export interface Filtered<out A, out E = never, out R = never>
* @since 1.20.0
*/
asComputed(): Computed<Option.Option<A>, E, R>

/**
* @since 1.25.0
*/
readonly unsafeGet: () => Exit.Exit<A, E | Cause.NoSuchElementException>
}

/**
Expand Down Expand Up @@ -101,6 +111,11 @@ export interface RefSubject<in out A, in out E = never, out R = never>
readonly runUpdates: <B, E2, R2>(
f: (ref: GetSetDelete<A, E, R>) => Effect.Effect<B, E2, R2>
) => Effect.Effect<B, E2, R | R2>

/**
* @since 1.25.0
*/
readonly unsafeGet: () => Exit.Exit<A, E>
}

/**
Expand Down Expand Up @@ -356,10 +371,10 @@ class RefSubjectImpl<A, E, R, R2> extends FxEffectBase<A, E, Exclude<R, R2> | Sc
readonly interrupt: Effect.Effect<void, never, Exclude<R, R2>>
readonly subscriberCount: Effect.Effect<number, never, Exclude<R, R2>>

private readonly getSetDelete: GetSetDelete<A, E, Exclude<R, R2>>
readonly getSetDelete: GetSetDelete<A, E, Exclude<R, R2>>

constructor(
private readonly core: RefSubjectCore<A, E, R, R2>
readonly core: RefSubjectCore<A, E, R, R2>
) {
super()

Expand All @@ -386,6 +401,9 @@ class RefSubjectImpl<A, E, R, R2> extends FxEffectBase<A, E, Exclude<R, R2> | Sc
return this.core.semaphore.withPermits(1)(run(this.getSetDelete))
}

unsafeGet: () => Exit.Exit<A, E> = () =>
Option.getOrThrowWith(this.core.deferredRef.current, () => new Cause.NoSuchElementException())

onSuccess(value: A): Effect.Effect<unknown, never, Exclude<R, R2>> {
return setCore(this.core, value)
}
Expand Down Expand Up @@ -1035,6 +1053,9 @@ class ComputedImpl<R0, E0, A, E, R, E2, R2, C, E3, R3> extends Versioned.Version
)
}

unsafeGet: () => Exit.Exit<C, E0 | E | E2 | E3> = () =>
Option.getOrThrowWith(this._currentValue, () => new Cause.NoSuchElementException())

static make<R0, E0, A, E, R, E2, R2, C, E3, R3>(
input: Versioned.Versioned<R0, E0, A, E, R, A, E2, R2>,
f: (a: A) => Effect.Effect<C, E3, R3>
Expand Down Expand Up @@ -1086,6 +1107,9 @@ class FilteredImpl<R0, E0, A, E, R, E2, R2, C, E3, R3> extends Versioned.Version
asComputed(): Computed<Option.Option<C>, E0 | E | E2 | E3, R0 | R2 | R3 | Exclude<R, Scope.Scope>> {
return ComputedImpl.make(this.input, this.f)
}

unsafeGet: () => Exit.Exit<C, Cause.NoSuchElementException | E0 | E | E2 | E3> = () =>
Option.getOrThrowWith(this._currentValue, () => new Cause.NoSuchElementException())
}

/**
Expand Down Expand Up @@ -1204,6 +1228,11 @@ class RefSubjectTransform<A, E, R, B> extends FxEffectBase<B, E, R | Scope.Scope
)
}

unsafeGet: () => Exit.Exit<B, E> = () => {
const exit = this.ref.unsafeGet()
return Exit.map(exit, this.from)
}

onFailure(cause: Cause.Cause<E>): Effect.Effect<unknown, never, R> {
return this.ref.onFailure(cause)
}
Expand All @@ -1227,7 +1256,7 @@ class RefSubjectTransformEffect<A, E, R, B, E2, R2, R3, E3>
readonly version: Effect.Effect<number, E, R>
readonly interrupt: Effect.Effect<void, never, R>
readonly subscriberCount: Effect.Effect<number, never, R>
readonly subject: Subject.Subject<B, E | E2 | E3>
readonly subject: Subject.HoldSubjectImpl<B, E | E2 | E3>

constructor(
readonly ref: RefSubject<A, E, R>,
Expand All @@ -1239,11 +1268,15 @@ class RefSubjectTransformEffect<A, E, R, B, E2, R2, R3, E3>
this.version = ref.version
this.interrupt = ref.interrupt
this.subscriberCount = ref.subscriberCount
this.subject = Subject.unsafeMake()
this.subject = new Subject.HoldSubjectImpl()
}

run<R4 = never>(sink: Sink.Sink<B, E | E2 | E3, R4>): Effect.Effect<unknown, never, R | R2 | R3 | Scope.Scope | R4> {
return core.merge(core.mapEffect(this.ref, this.from), this.subject).run(sink)
return core.skipRepeats(
core.merge(core.tapEffect(core.mapEffect(this.ref, this.from), this.subject.onSuccess), this.subject)
).run(
sink
)
}

runUpdates<R4, E4, C>(
Expand All @@ -1268,6 +1301,10 @@ class RefSubjectTransformEffect<A, E, R, B, E2, R2, R3, E3>
)
}

unsafeGet: () => Exit.Exit<B, E | E2 | E3> = () => {
return Option.getOrThrowWith(MutableRef.get(this.subject.lastValue), () => new Cause.NoSuchElementException())
}

onFailure(cause: Cause.Cause<E | E2 | E3>): Effect.Effect<unknown, never, R> {
return this.subject.onFailure(cause)
}
Expand Down Expand Up @@ -1494,6 +1531,21 @@ class RefSubjectTuple<
return run(this.getSetDelete)
}

unsafeGet: () => Exit.Exit<
{ readonly [K in keyof Refs]: Effect.Effect.Success<Refs[K]> },
Effect.Effect.Error<Refs[number]>
> = () => {
return Option.getOrThrowWith(
Exit.all(this.refs.map((r) => r.unsafeGet())) as Option.Option<
Exit.Exit<
{ readonly [K in keyof Refs]: Effect.Effect.Success<Refs[K]> },
Effect.Effect.Error<Refs[number]>
>
>,
() => new Cause.NoSuchElementException()
)
}

onFailure(
cause: Cause.Cause<Effect.Effect.Error<Refs[number]>>
): Effect.Effect<unknown, never, Effect.Effect.Context<Refs[number]>> {
Expand Down Expand Up @@ -1648,6 +1700,24 @@ class RefSubjectStruct<
return run(this.getSetDelete)
}

unsafeGet: () => Exit.Exit<
{ readonly [K in keyof Refs]: Effect.Effect.Success<Refs[K]> },
Effect.Effect.Error<Refs[keyof Refs]>
> = () => {
const entries = Object.entries(this.refs).map(([k, r]) => Exit.map(r.unsafeGet(), (a) => [k, a] as const))
const exit = Option.getOrThrowWith(
Exit.all(entries) as Option.Option<
Exit.Exit<
ReadonlyArray<readonly [string, Effect.Effect.Success<Refs[keyof Refs]>]>,
Effect.Effect.Error<Refs[keyof Refs]>
>
>,
() => new Cause.NoSuchElementException()
)

return Exit.map(exit, (entries) => Object.fromEntries(entries)) as any
}

onFailure(
cause: Cause.Cause<Effect.Effect.Error<Refs[keyof Refs]>>
): Effect.Effect<unknown, never, Effect.Effect.Context<Refs[keyof Refs]>> {
Expand Down Expand Up @@ -1785,6 +1855,10 @@ class RefSubjectTagged<I, E, A> extends FxEffectBase<
return this.tag.withEffect((ref) => ref.runUpdates(run))
}

unsafeGet: () => Exit.Exit<A, E> = () => {
throw new Error(`Unable to unsafely get a tagged RefSubject because it requires the Effect context by defnition.`)
}

onFailure(cause: Cause.Cause<E>): Effect.Effect<unknown, never, I> {
return this.tag.withEffect((ref) => ref.onFailure(cause))
}
Expand Down Expand Up @@ -1855,6 +1929,10 @@ class RefSubjectFromTag<I, S, A, E, R> extends FxEffectBase<
return Effect.flatMap(this._get, (ref) => ref.runUpdates(run))
}

unsafeGet: () => Exit.Exit<A, E> = () => {
throw new Error(`Unable to unsafely get a tagged RefSubject because it requires the Effect context by defnition.`)
}

onFailure(cause: Cause.Cause<E>): Effect.Effect<unknown, never, I | R> {
return Effect.flatMap(this._get, (ref) => ref.onFailure(cause))
}
Expand Down Expand Up @@ -1949,6 +2027,10 @@ class ComputedFromTag<I, S, A, E, R> extends FxEffectBase<
toEffect(): Effect.Effect<A, E, I | R> {
return Effect.flatten(this._get)
}

unsafeGet: () => Exit.Exit<A, E> = () => {
throw new Error(`Unable to unsafely get a tagged Computed because it requires the Effect context by defnition.`)
}
}

/**
Expand Down Expand Up @@ -1996,6 +2078,10 @@ class FilteredFromTag<I, S, A, E, R> extends FxEffectBase<
asComputed(): Computed<Option.Option<A>, E, R | I> {
return new ComputedFromTag(this.tag, (s) => this.f(s).asComputed())
}

unsafeGet: () => Exit.Exit<A, E> = () => {
throw new Error(`Unable to unsafely get a tagged Filtered because it requires the Effect context by defnition.`)
}
}

/**
Expand Down Expand Up @@ -2198,6 +2284,8 @@ class RefSubjectSlice<A, E, R> extends FxEffectBase<A, E, R | Scope.Scope, A, E,
return this.ref.runUpdates(run)
}

unsafeGet: () => Exit.Exit<A, E> = () => this.ref.unsafeGet()

onFailure(cause: Cause.Cause<E>): Effect.Effect<unknown, never, R> {
return this.ref.onFailure(cause)
}
Expand All @@ -2206,3 +2294,33 @@ class RefSubjectSlice<A, E, R> extends FxEffectBase<A, E, R | Scope.Scope, A, E,
return this.ref.onSuccess(value)
}
}

/**
* Get the current value of the RefSubject. If it has not been set yet, a Fiber will be used to wait for the value to be set.
*
* @since 1.25.0
*/
export const get: {
<A, E = never, R = never>(ref: RefSubject<A, E, R>): Effect.Effect<A, E, R>
<A, E = never, R = never>(ref: Computed<A, E, R>): Effect.Effect<A, E, R>
<A, E = never, R = never>(ref: Filtered<A, E, R>): Effect.Effect<A, E | Cause.NoSuchElementException, R>
} = <A, E, R>(
ref: RefSubject<A, E, R> | Computed<A, E, R> | Filtered<A, E, R>
): Effect.Effect<A, E | Cause.NoSuchElementException, R> => ref

/**
* Synchronously get the current Exit value of the RefSubject. If it has not been set yet, a Cause.NoSuchElementException will be thrown.
*
* Note: This is unimplemented for RefSubject.tagged and RefSubject.fromTag because they require the Effect context by definition.
* It will throw immediately.
*
* @since 1.25.0
*/
export const unsafeGetExit = <A, E = never, R = never>(ref: RefSubject<A, E, R>): Exit.Exit<A, E> => ref.unsafeGet()

/**
* Synchronously get the current value of the RefSubject.
*
* @since 1.25.0
*/
export const unsafeGet = <A, E = never, R = never>(ref: RefSubject<A, E, R>): A => Effect.runSync(unsafeGetExit(ref))
14 changes: 11 additions & 3 deletions packages/fx/src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,29 @@ export class SubjectImpl<A, E> extends FxBase<A, E, Scope.Scope> implements Subj
* @internal
*/
export class HoldSubjectImpl<A, E> extends SubjectImpl<A, E> implements Subject<A, E> {
private lastValue: MutableRef.MutableRef<Option.Option<A>> = MutableRef.make(Option.none())
readonly lastValue: MutableRef.MutableRef<Option.Option<Exit.Exit<A, E>>> = MutableRef.make(Option.none())

// Emit an event to all sinks
onSuccess = (a: A) =>
Effect.suspend(() => {
MutableRef.set(this.lastValue, Option.some(a))
MutableRef.set(this.lastValue, Option.some(Exit.succeed(a)))

return this.onEvent(a)
})

onFailure = (cause: Cause.Cause<E>): Effect.Effect<void, never, never> => {
return Effect.suspend(() => {
MutableRef.set(this.lastValue, Option.some(Exit.failCause(cause)))

return this.onCause(cause)
})
}

run<R2>(sink: Sink<A, E, R2>): Effect.Effect<unknown, never, R2 | Scope.Scope> {
return this.addSink(sink, (scope) =>
Option.match(MutableRef.get(this.lastValue), {
onNone: () => awaitScopeClose(scope),
onSome: (a) => Effect.zipRight(sink.onSuccess(a), awaitScopeClose(scope))
onSome: (exit) => Effect.zipRight(Exit.match(exit, sink), awaitScopeClose(scope))
}))
}

Expand Down
24 changes: 16 additions & 8 deletions packages/fx/src/Versioned.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import type * as Context from "@typed/context"
import type { Layer, Runtime, Scope } from "effect"
import { Exit } from "effect"
import * as Effect from "effect/Effect"
import { dual, flow } from "effect/Function"
import { sum } from "effect/Number"
Expand Down Expand Up @@ -112,9 +113,9 @@ export class VersionedTransform<R0, E0, A, E, R, B, E2, R2, C, E3, R3, D, E4, R4
extends FxEffectBase<C, E3, R3, D, E0 | E4, R0 | R4>
implements Versioned<never, never, C, E3, R3, D, E0 | E4, R0 | R4>
{
protected _version = -1
protected _currentValue: Option.Option<D> = Option.none()
protected _fx: Fx<C, E3, R3>
public _version = -1
public _currentValue: Option.Option<Exit.Exit<D, E0 | E4>> = Option.none()
public _fx: Fx<C, E3, R3>

constructor(
readonly input: Versioned<R0, E0, A, E, R, B, E2, R2>,
Expand All @@ -135,18 +136,25 @@ export class VersionedTransform<R0, E0, A, E, R, B, E2, R2, C, E3, R3, D, E4, R4
toEffect(): Effect.Effect<D, E0 | E4, R0 | R4> {
const transformed = this._transformEffect(this.input as any as Effect.Effect<B, E2, R2>)
const update = (v: number) =>
Effect.tap(
transformed,
(value) =>
Effect.tapErrorCause(
Effect.tap(
transformed,
(value) =>
Effect.sync(() => {
this._currentValue = Option.some(Exit.succeed(value))
this._version = v
})
),
(cause) =>
Effect.sync(() => {
this._currentValue = Option.some(value)
this._currentValue = Option.some(Exit.failCause(cause))
this._version = v
})
)

return new MulticastEffect(Effect.flatMap(this.input.version, (version) => {
if (version === this._version && Option.isSome(this._currentValue)) {
return Effect.succeed(this._currentValue.value)
return this._currentValue.value
}

return update(version)
Expand Down
Loading

0 comments on commit 162b156

Please sign in to comment.