Skip to content

Commit

Permalink
WIP: eliminate Effect where possible from Template rendering
Browse files Browse the repository at this point in the history
  • Loading branch information
TylorS committed Jan 9, 2024
1 parent adfcf3f commit 58d0de9
Show file tree
Hide file tree
Showing 31 changed files with 1,278 additions and 446 deletions.
11 changes: 10 additions & 1 deletion examples/todomvc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@ import { Effect, Layer } from "effect"
import { Live } from "./infrastructure"
import { TodoApp } from "./presentation"

// We must eliminate the time it takes between diffing and patching the DOM
// There is a lot of time spent spinning up templates

const environment = Live.pipe(
Layer.provideMerge(Storage.layer(localStorage)),
Layer.provideMerge(Router.browser),
Layer.provideMerge(Navigation.fromWindow),
Layer.provideMerge(RenderContext.dom(window))
)

TodoApp.pipe(renderLayer, Layer.provide(environment), Layer.launch, Effect.scoped, Effect.runFork)
TodoApp.pipe(
renderLayer,
Layer.provide(environment),
Layer.launch,
Effect.scoped,
Effect.runFork
)
2 changes: 1 addition & 1 deletion examples/todomvc/stats.html

Large diffs are not rendered by default.

19 changes: 3 additions & 16 deletions packages/dom/src/internal/_helpers.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import * as Effect from "effect/Effect"
import * as ExecutionStrategy from "effect/ExecutionStrategy"
import * as Fiber from "effect/Fiber"
import type * as Fiber from "effect/Fiber"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import type * as Scope from "effect/Scope"
import type * as TQS from "typed-query-selector/parser"

export type ParseSelector<T extends string, Fallback> = [T] extends [typeof ROOT_CSS_SELECTOR] ? Fallback
Expand Down Expand Up @@ -34,20 +33,8 @@ export function createScopedRuntime<R>(): Effect.Effect<
const runtime = yield* _(Effect.runtime<R | Scope.Scope>())
const scope = yield* _(Effect.scope)
const runFork = Runtime.runFork(runtime)

const run = <E, A>(effect: Effect.Effect<R | Scope.Scope, E, A>): Fiber.Fiber<E, A> => {
const fiber: Fiber.Fiber<E, A> = Scope.fork(scope, ExecutionStrategy.sequential).pipe(
Effect.flatMap((childScope) =>
Effect.zipRight(
Scope.addFinalizer(
childScope,
Effect.suspend(() => fiber ? Fiber.interrupt(fiber) : Effect.unit)
),
Effect.onExit(effect, (exit) => Scope.close(childScope, exit))
)
),
runFork
)
const fiber: Fiber.Fiber<E, A> = runFork(Effect.fromFiberEffect(Effect.forkIn(effect, scope)))

return fiber
}
Expand Down
4 changes: 2 additions & 2 deletions packages/fx/src/AsyncData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export const matchAsyncData: {
}
) => Fx.Fx<R4, E4, D>
readonly Success: (
value: RefSubject.Computed<never, never, A>,
value: RefSubject.RefSubject<never, never, A>,
options: {
readonly timestamp: RefSubject.Computed<never, never, number>
readonly progress: RefSubject.Filtered<never, never, Progress>
Expand All @@ -123,7 +123,7 @@ export const matchAsyncData: {
}
),
Success: (success) =>
matchers.Success(RefSubject.map(success, (s) => s.value), {
matchers.Success(RefSubject.transform(success, (s) => s.value, (value) => AsyncData.success(value)), {
timestamp: RefSubject.map(success, (s) => s.timestamp),
progress: RefSubject.filterMap(success, (f) => Option.flatMap(f.refreshing, (l) => l.progress))
})
Expand Down
7 changes: 2 additions & 5 deletions packages/fx/src/Emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as ExecutionStrategy from "effect/ExecutionStrategy"
import type * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import type * as Scope from "effect/Scope"
import { withScope } from "./internal/helpers.js"
import * as Sink from "./Sink.js"

Expand All @@ -30,10 +30,7 @@ export function withEmitter<R, E, A, R2, B>(
const run = (effect: Effect.Effect<R, never, unknown>) =>
runPromiseExit(
Effect.flatMap(
Effect.flatMap(
Scope.fork(scope, ExecutionStrategy.sequential),
(childScope) => Effect.forkIn(effect, childScope)
),
Effect.forkIn(effect, scope),
Fiber.join
)
)
Expand Down
2 changes: 1 addition & 1 deletion packages/fx/src/Fx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ export function drainLayer<FXS extends ReadonlyArray<Fx<any, never, any>>>(...fx
never,
never
> {
return Layer.scopedDiscard(Effect.forkScoped(core.drain(core.mergeAll(fxs))))
return Layer.scopedDiscard(Effect.forkWithErrorHandler(core.drain(core.mergeAll(fxs)), Effect.logError))
}

/**
Expand Down
17 changes: 12 additions & 5 deletions packages/fx/src/Idle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class IdleSchedulerImpl implements IdleScheduler {
this.#tasks.scheduleTask(task, priority)

// If we're not running yet, schedule the next run
if (!this.#running) {
if (this.#running === false) {
this.#running = true
this.scheduleNextRun()
}
Expand All @@ -65,23 +65,30 @@ class IdleSchedulerImpl implements IdleScheduler {
}

private scheduleNextRun() {
this.#id = requestIdleCallback((deadline) => this.runTasks(deadline), this.options)
this.#id = requestIdleCallback((deadline) => this.runTasks(deadline), { timeout: 1000, ...this.options })
}

private runTasks(deadline: IdleDeadline) {
const buckets = this.#tasks.buckets
const buckets = this.#tasks.buckets.slice(0)
this.#tasks.buckets = []
const length = buckets.length

let i = 0
for (; shouldContinue(deadline) && i < length; ++i) {
buckets[i][1].forEach((f) => f())
const tasks = buckets[i][1].slice()
tasks.forEach((f) => f())
}

// Remove all the buckets we were able to complete
buckets.splice(0, i)

// If there are more tasks to run, schedule the next run
// If there are leftover tasks, requeue them
if (buckets.length > 0) {
buckets.forEach(([priority, tasks]) => tasks.forEach((f) => this.#tasks.scheduleTask(f, priority)))
}

// If there are more tasks to run, schedule the next run
if (this.#tasks.buckets.length > 0) {
this.scheduleNextRun()
} else {
// Otherwise we're done for now
Expand Down
142 changes: 136 additions & 6 deletions packages/fx/src/RefSubject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as C from "@typed/context"
import type { Equivalence, Fiber, Runtime } from "effect"
import type { Equivalence, Fiber, FiberId, Runtime } from "effect"
import * as Boolean from "effect/Boolean"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
Expand All @@ -15,7 +15,7 @@ import * as Scope from "effect/Scope"
import type { Fx } from "./Fx.js"
import * as core from "./internal/core.js"
import * as DeferredRef from "./internal/DeferredRef.js"
import { getExitEquivalence } from "./internal/helpers.js"
import { getExitEquivalence, makeForkInScope, withScope } from "./internal/helpers.js"
import { FxEffectBase } from "./internal/protos.js"
import { runtimeToLayer } from "./internal/provide.js"
import * as share from "./internal/share.js"
Expand Down Expand Up @@ -185,7 +185,49 @@ export function of<A, E = never>(
a: A,
options?: RefSubjectOptions<A>
): Effect.Effect<Scope.Scope, never, RefSubject<never, E, A>> {
return make<never, E, A>(Effect.succeed(a), options)
return Effect.acquireRelease(
withScopeAndFiberId(
(scope, id) =>
unsafeMake<E, A>({
id,
initial: Effect.succeed(a),
initialValue: a,
options,
scope
}),
options?.executionStrategy ?? ExecutionStrategy.sequential
),
(ref) => ref.interrupt
)
}

const withScopeAndFiberId = <R, E, A>(
f: (scope: Scope.CloseableScope, id: FiberId.FiberId) => Effect.Effect<R, E, A>,
strategy: ExecutionStrategy.ExecutionStrategy
) => Effect.fiberIdWith((id) => withScope((scope) => f(scope, id), strategy))

const emptyContext = C.empty()

export function unsafeMake<E, A>(
params: {
readonly id: FiberId.FiberId
readonly initial: Effect.Effect<never, E, A>
readonly options?: RefSubjectOptions<A> | undefined
readonly scope: Scope.CloseableScope
readonly initialValue?: A
}
): Effect.Effect<never, never, RefSubject<never, E, A>> {
const { id, initial, options, scope } = params
const core = unsafeMakeCore(initial, id, emptyContext, scope, options)

// Sometimes we might be instantiating directly from a known value
// Here we seed the value and ensure the subject has it as well for re-broadcasting
if ("initialValue" in params) {
core.deferredRef.done(Exit.succeed(params.initialValue))
return Effect.map(core.subject.onSuccess(params.initialValue), () => new RefSubjectImpl(core))
}

return Effect.succeed(new RefSubjectImpl(core))
}

class RefSubjectImpl<R, E, A, R2> extends FxEffectBase<Exclude<R, R2> | Scope.Scope, E, A, Exclude<R, R2>, E, A>
Expand Down Expand Up @@ -434,6 +476,23 @@ function makeCore<R, E, A>(
)
}

function unsafeMakeCore<R, E, A>(
initial: Effect.Effect<R, E, A>,
id: FiberId.FiberId,
ctx: C.Context<R>,
scope: Scope.CloseableScope,
options?: RefSubjectOptions<A>
) {
return new RefSubjectCore(
initial,
Subject.unsafeMake<E, A>(Math.max(1, options?.replay ?? 1)),
ctx,
scope,
DeferredRef.unsafeMake(id, getExitEquivalence(options?.eq ?? Equal.equals)),
Effect.unsafeMakeSemaphore(1)
)
}

function getOrInitializeCore<R, E, A, R2>(
core: RefSubjectCore<R, E, A, R2>,
lockInitialize: boolean
Expand All @@ -460,11 +519,12 @@ function initializeCore<R, E, A, R2>(
})
)

const fork = makeForkInScope(core.scope)

return Effect.zipRight(
Effect.tap(
Effect.forkIn(
lock && core.semaphore ? core.semaphore.withPermits(1)(initialize) : initialize,
core.scope
fork(
lock && core.semaphore ? core.semaphore.withPermits(1)(initialize) : initialize
),
(fiber) => Effect.sync(() => core._fiber = fiber)
),
Expand Down Expand Up @@ -1799,3 +1859,73 @@ const sub = (x: number): number => x - 1
export const decrement: <R, E>(ref: RefSubject<R, E, number>) => Effect.Effect<R, E, number> = <R, E>(
ref: RefSubject<R, E, number>
) => update(ref, sub)

export const slice: {
(drop: number, take: number): <R, E, A>(ref: RefSubject<R, E, A>) => RefSubject<R, E, A>
<R, E, A>(ref: RefSubject<R, E, A>, drop: number, take: number): RefSubject<R, E, A>
} = dual(
3,
function slice<R, E, A>(ref: RefSubject<R, E, A>, drop: number, take: number): RefSubject<R, E, A> {
return new RefSubjectSlice(ref, drop, take)
}
)

export const drop: {
(drop: number): <R, E, A>(ref: RefSubject<R, E, A>) => RefSubject<R, E, A>
<R, E, A>(ref: RefSubject<R, E, A>, drop: number): RefSubject<R, E, A>
} = dual(2, function drop<R, E, A>(ref: RefSubject<R, E, A>, drop: number): RefSubject<R, E, A> {
return slice(ref, drop, Infinity)
})

export const take: {
(take: number): <R, E, A>(ref: RefSubject<R, E, A>) => RefSubject<R, E, A>
<R, E, A>(ref: RefSubject<R, E, A>, take: number): RefSubject<R, E, A>
} = dual(2, function take<R, E, A>(ref: RefSubject<R, E, A>, take: number): RefSubject<R, E, A> {
return slice(ref, 0, take)
})

class RefSubjectSlice<R, E, A> extends FxEffectBase<R | Scope.Scope, E, A, R, E, A> implements RefSubject<R, E, A> {
readonly [ComputedTypeId]: ComputedTypeId = ComputedTypeId
readonly [RefSubjectTypeId]: RefSubjectTypeId = RefSubjectTypeId

readonly version: Effect.Effect<R, E, number>
readonly interrupt: Effect.Effect<R, never, void>
readonly subscriberCount: Effect.Effect<R, never, number>
private _fx: Fx<Scope.Scope | R, E, A>

constructor(
readonly ref: RefSubject<R, E, A>,
readonly drop: number,
readonly take: number
) {
super()

this.version = ref.version
this.interrupt = ref.interrupt
this.subscriberCount = ref.subscriberCount
this._fx = share.hold(core.slice(ref, drop, take))
this._effect = ref
}

run<R2>(sink: Sink.Sink<R2, E, A>): Effect.Effect<R | R2 | Scope.Scope, never, unknown> {
return this._fx.run(sink)
}

toEffect(): Effect.Effect<R, E, A> {
return this.ref
}

runUpdates<R2, E2, C>(
run: (ref: GetSetDelete<R, E, A>) => Effect.Effect<R2, E2, C>
): Effect.Effect<R | R2, E2, C> {
return this.ref.runUpdates(run)
}

onFailure(cause: Cause.Cause<E>): Effect.Effect<R, never, unknown> {
return this.ref.onFailure(cause)
}

onSuccess(value: A): Effect.Effect<R, never, unknown> {
return this.ref.onSuccess(value)
}
}
4 changes: 2 additions & 2 deletions packages/fx/src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ export class SubjectImpl<E, A> extends FxBase<Scope.Scope, E, A> implements Subj
protected onEvent(a: A) {
if (this.sinks.size === 0) return Effect.unit
else {
return Effect.forEach(Array.from(this.sinks), ([sink, ctx]) => Effect.provide(sink.onSuccess(a), ctx), DISCARD)
return Effect.forEach(this.sinks, ([sink, ctx]) => Effect.provide(sink.onSuccess(a), ctx), DISCARD)
}
}

protected onCause(cause: Cause.Cause<E>) {
return Effect.forEach(
Array.from(this.sinks),
this.sinks,
([sink, ctx]) => Effect.provide(sink.onFailure(cause), ctx),
DISCARD
)
Expand Down
4 changes: 4 additions & 0 deletions packages/fx/src/internal/DeferredRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ export class DeferredRef<E, A> extends EffectBase<never, E, A> {
export function make<E, A>(eq: Equivalence.Equivalence<Exit.Exit<E, A>>) {
return Effect.map(Effect.fiberId, (id) => new DeferredRef(id, eq))
}

export function unsafeMake<E, A>(id: FiberId.FiberId, eq: Equivalence.Equivalence<Exit.Exit<E, A>>) {
return new DeferredRef(id, eq)
}
2 changes: 1 addition & 1 deletion packages/fx/src/internal/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export function withScopedFork<R, E, A>(
return withScope((scope) => f(makeForkInScope(scope), scope), executionStrategy)
}

function makeForkInScope(scope: Scope.Scope) {
export function makeForkInScope(scope: Scope.Scope) {
return <R, E, A>(effect: Effect.Effect<R, E, A>) =>
matchEffectPrimitive<R, E, A, Effect.Effect<Exclude<R, Scope.Scope>, never, Fiber.Fiber<E, A>>>(effect, {
Success: (a) => Effect.succeed(Fiber.succeed(a)),
Expand Down
Loading

0 comments on commit 58d0de9

Please sign in to comment.