Skip to content

Commit

Permalink
liftOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonyjoeseph committed Jan 18, 2021
1 parent d3c804e commit 1de9204
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 5 deletions.
16 changes: 16 additions & 0 deletions docs/modules/ObservableEither.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Added in v0.6.8
- [apSecond](#apsecond)
- [chainFirst](#chainfirst)
- [flatten](#flatten)
- [liftOperator](#liftoperator)
- [orElse](#orelse)
- [swap](#swap)
- [constructors](#constructors)
Expand Down Expand Up @@ -255,6 +256,21 @@ export declare const flatten: <E, A>(mma: ObservableEither<E, ObservableEither<E
Added in v0.6.0
## liftOperator
Lifts an OperatorFunction into an ObservableEither context
Allows e.g. filter to be used on on ObservableEither
**Signature**
```ts
export declare function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableEither<E, A>) => ObservableEither<E, B>
```

Added in v0.6.12

## orElse

**Signature**
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/ObservableThese.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Added in v0.6.12
- [Functor](#functor)
- [map](#map)
- [combinators](#combinators)
- [liftOperator](#liftoperator)
- [swap](#swap)
- [constructors](#constructors)
- [both](#both)
Expand Down Expand Up @@ -103,6 +104,21 @@ Added in v0.6.12
# combinators
## liftOperator
Lifts an OperatorFunction into an ObservableThese context
Allows e.g. filter to be used on on ObservableThese
**Signature**
```ts
export declare function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableThese<E, A>) => ObservableThese<E, B>
```

Added in v0.6.12

## swap

**Signature**
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/ReaderObservableEither.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Added in v0.6.10
- [apSecond](#apsecond)
- [chainFirst](#chainfirst)
- [flatten](#flatten)
- [liftOperator](#liftoperator)
- [local](#local)
- [constructors](#constructors)
- [ask](#ask)
Expand Down Expand Up @@ -246,6 +247,21 @@ export declare const flatten: <R, E, A>(
Added in v0.6.10
## liftOperator
Lifts an OperatorFunction into a ReaderObservableEither context
Allows e.g. filter to be used on on ReaderObservableEither
**Signature**
```ts
export declare function liftOperator<R, E, A, B>(
f: OperatorFunction<A, B>
): (obs: ReaderObservableEither<R, E, A>) => ReaderObservableEither<R, E, B>
```

Added in v0.6.12

## local

**Signature**
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/StateReaderObservableEither.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Added in v0.6.10
- [apSecond](#apsecond)
- [chainFirst](#chainfirst)
- [flatten](#flatten)
- [liftOperator](#liftoperator)
- [constructors](#constructors)
- [fromIO](#fromio)
- [fromObservable](#fromobservable)
Expand Down Expand Up @@ -233,6 +234,21 @@ export declare const flatten: <S, R, E, A>(
Added in v0.6.10
## liftOperator
Lifts an OperatorFunction into a StateReaderObservableEither context
Allows e.g. filter to be used on on StateReaderObservableEither
**Signature**
```ts
export declare function liftOperator<S, R, E, A, B>(
f: OperatorFunction<[A, S], [B, S]>
): (obs: StateReaderObservableEither<S, R, E, A>) => StateReaderObservableEither<S, R, E, B>
```

Added in v0.6.12

# constructors

## fromIO
Expand Down
25 changes: 23 additions & 2 deletions src/ObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import { MonadThrow2 } from 'fp-ts/lib/MonadThrow'
import { Option } from 'fp-ts/lib/Option'
import { pipe } from 'fp-ts/lib/pipeable'
import * as TE from 'fp-ts/lib/TaskEither'
import { Observable } from 'rxjs'
import { catchError } from 'rxjs/operators'
import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs'
import { finalize, catchError } from 'rxjs/operators'
import { MonadObservable2 } from './MonadObservable'
import * as R from './Observable'

Expand Down Expand Up @@ -166,6 +166,27 @@ export const swap: <E, A>(ma: ObservableEither<E, A>) => ObservableEither<A, E>
/*#__PURE__*/
R.map(E.swap)

/**
* Lifts an OperatorFunction into an ObservableEither context
* Allows e.g. filter to be used on on ObservableEither
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableEither<E, A>) => ObservableEither<E, B> {
return obs => {
const subj = new Subject<E.Either<E, A>>()
return merge(
pipe(subj, R.separate, ({ left, right }) => merge(pipe(left, R.map(E.left)), pipe(right, f, R.map(E.right)))),
defer(() => {
obs.pipe(finalize(() => subj.complete())).subscribe(subj)
})
)
}
}

// -------------------------------------------------------------------------------------
// type class members
// -------------------------------------------------------------------------------------
Expand Down
52 changes: 51 additions & 1 deletion src/ObservableThese.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
import { Semigroup } from 'fp-ts/lib/Semigroup'
import * as TT from 'fp-ts/lib/TaskThese'
import * as TH from 'fp-ts/lib/These'
import { Observable } from 'rxjs'
import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs'
import { finalize, withLatestFrom } from 'rxjs/operators'
import * as R from './Observable'

// -------------------------------------------------------------------------------------
Expand Down Expand Up @@ -142,6 +143,55 @@ export const swap: <E, A>(ma: ObservableThese<E, A>) => ObservableThese<A, E> =
/*#__PURE__*/
R.map(TH.swap)

/**
* Lifts an OperatorFunction into an ObservableThese context
* Allows e.g. filter to be used on on ObservableThese
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableThese<E, A>) => ObservableThese<E, B> {
return obs => {
const subj = new Subject<TH.These<E, A>>()
return merge(
pipe(
merge(
pipe(subj, R.filter(TH.isLeft)),
pipe(
subj,
R.filter(TH.isRight),
R.map(({ right }) => right),
f,
R.map(TH.right)
),
pipe(
subj,
R.filter(TH.isBoth),
a =>
pipe(
a,
R.map(({ right }) => right),
f,
withLatestFrom(
pipe(
a,
R.map(({ left }) => left)
)
)
),
R.map(([b, e]) => TH.both(e, b))
)
)
),
defer(() => {
obs.pipe(finalize(() => subj.complete())).subscribe(subj)
})
)
}
}

// -------------------------------------------------------------------------------------
// type class members
// -------------------------------------------------------------------------------------
Expand Down
14 changes: 14 additions & 0 deletions src/ReaderObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { MonadThrow3 } from 'fp-ts/lib/MonadThrow'
import { Option } from 'fp-ts/lib/Option'
import { pipe } from 'fp-ts/lib/pipeable'
import * as R from 'fp-ts/lib/Reader'
import { OperatorFunction } from 'rxjs'
import { MonadObservable3 } from './MonadObservable'
import * as OE from './ObservableEither'

Expand Down Expand Up @@ -103,6 +104,19 @@ export const local: <R2, R1>(
f: (d: R2) => R1
) => <E, A>(ma: ReaderObservableEither<R1, E, A>) => ReaderObservableEither<R2, E, A> = R.local

/**
* Lifts an OperatorFunction into a ReaderObservableEither context
* Allows e.g. filter to be used on on ReaderObservableEither
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<R, E, A, B>(
f: OperatorFunction<A, B>
): (obs: ReaderObservableEither<R, E, A>) => ReaderObservableEither<R, E, B> {
return obs => r => OE.liftOperator<E, A, B>(f)(obs(r))
}

// -------------------------------------------------------------------------------------
// type class members
// -------------------------------------------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions src/StateReaderObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import { MonadTask4 } from 'fp-ts/lib/MonadTask'
import { MonadThrow4 } from 'fp-ts/lib/MonadThrow'
import { Option } from 'fp-ts/lib/Option'
import { pipe } from 'fp-ts/lib/pipeable'
import { OperatorFunction } from 'rxjs'
import { MonadObservable4 } from './MonadObservable'
import * as OB from './Observable'
import * as OE from './ObservableEither'
import * as ROE from './ReaderObservableEither'

// -------------------------------------------------------------------------------------
Expand Down Expand Up @@ -193,6 +195,19 @@ export const apSecond = <S, R, E, B>(
ap(fb)
)

/**
* Lifts an OperatorFunction into a StateReaderObservableEither context
* Allows e.g. filter to be used on on StateReaderObservableEither
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<S, R, E, A, B>(
f: OperatorFunction<[A, S], [B, S]>
): (obs: StateReaderObservableEither<S, R, E, A>) => StateReaderObservableEither<S, R, E, B> {
return obs => s => r => OE.liftOperator<E, [A, S], [B, S]>(f)(obs(s)(r))
}

/**
* @category Bifunctor
* @since 0.6.10
Expand Down
17 changes: 16 additions & 1 deletion test/ObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
import { bufferTime } from 'rxjs/operators'
import * as O from 'fp-ts/lib/Option'
import * as _ from '../src/ObservableEither'
import { of as rxOf, Observable, throwError as rxThrowError } from 'rxjs'
import { of as rxOf, Observable, from, throwError as rxThrowError } from 'rxjs'
import { filter } from '../src/Observable'

describe('ObservableEither', () => {
it('rightIO', async () => {
Expand Down Expand Up @@ -125,6 +126,20 @@ describe('ObservableEither', () => {
assert.deepStrictEqual(e, [E.left(1)])
})

it('liftOperator (left)', async () => {
const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')])
})

it('liftOperator (right)', async () => {
const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [E.right(2), E.right(4)])
})

describe('Monad', () => {
it('of', async () => {
const fea = _.of(1)
Expand Down
27 changes: 26 additions & 1 deletion test/ObservableThese.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
import { bufferTime } from 'rxjs/operators'
import { monoidString } from 'fp-ts/lib/Monoid'
import * as _ from '../src/ObservableThese'
import { of as rxOf, Observable } from 'rxjs'
import { of as rxOf, Observable, from } from 'rxjs'
import { filter } from '../src/Observable'

describe('ObservableThese', () => {
it('rightIO', async () => {
Expand Down Expand Up @@ -93,6 +94,30 @@ describe('ObservableThese', () => {
assert.deepStrictEqual(e, [TH.both(2, 1)])
})

it('liftOperator (left)', async () => {
const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [TH.left('error1'), TH.left('error2')])
})

it('liftOperator (right)', async () => {
const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [TH.right(2), TH.right(4)])
})

it('liftOperator (both)', async () => {
const e = await pipe(
from([TH.both('error1', 1), TH.both('error2', 2), TH.both('error3', 3), TH.both('error4', 4)]),
_.liftOperator(filter(x => x % 2 === 0))
)
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [TH.both('error2', 2), TH.both('error4', 4)])
})

it('map', async () => {
const f = (n: number): number => n * 2
assert.deepStrictEqual(await pipe(_.right(1), _.map(f)).toPromise(), TH.right(2))
Expand Down
25 changes: 25 additions & 0 deletions test/ReaderObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as O from 'fp-ts/lib/Option'
import * as E from 'fp-ts/lib/Either'
import * as R from 'fp-ts/lib/Reader'
import * as T from 'fp-ts/lib/Task'
import { from } from 'rxjs'

// test helper to dry up LOC.
export const buffer = flow(R.map(bufferTime(10)), R.map(OB.toTask))
Expand Down Expand Up @@ -100,6 +101,30 @@ describe('ReaderObservable', () => {
assert.deepStrictEqual(e, [E.right(4)])
})

it('liftOperator (left)', async () => {
const robe = pipe(
from(['error1', 'error2']),
OBE.leftObservable,
_.fromObservableEither,
_.liftOperator(OB.filter(x => x % 2 === 0)),
buffer
)
const e = await robe({})()
assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')])
})

it('liftOperator (right)', async () => {
const robe = pipe(
from([1, 2, 3, 4]),
OBE.rightObservable,
_.fromObservableEither,
_.liftOperator(OB.filter(x => x % 2 === 0)),
buffer
)
const e = await robe({})()
assert.deepStrictEqual(e, [E.right(2), E.right(4)])
})

it('fromTask', async () => {
const robe = pipe(_.fromTask(T.of(1)), buffer)
const x = await robe({})()
Expand Down
Loading

0 comments on commit 1de9204

Please sign in to comment.