** This library is a work in progress **
This is a port of Project Reactor into Go. It provides reactive data streams with asynchronous pull backpressure and operator fusion. Its aim is to be as close to the proposed Java API as possible, altering it slightly where needed for it to make sense in Go.
For more information see generated godocs about:
More thorough documentation to come after most/all operators are implemented.
The explanations contained here assume the knowledge of the Observer Pattern.
- Publisher is an observable that supports asynchronous backpressure in form of its
Request(int64)
method. It will only ever emit the requested amount of emissions (does not apply to the closing emissions complete and error). It emission footprint is ( [Next](0-N) Complete|Error ) - Flux is a Publisher specific to this library that has access to operators described here. It can be switched to unbounded mode, basically transforming it to an observable by calling
Request(math.MaxInt64)
on its Subscripiton. - Mono is a Publisher specific to this library whose emission footprint is ( [Next](0-1) Complete|Error ). It also has operators described here. It can be switched to unbounded mode, basically transforming it to an observable by calling
RequestUnbounded()
which is really just a proxy toRequest(math.MaxInt64)
on its Subscripiton. - Subscription is the result of subscribing a Subscriber to a Publisher. Subscription provides means to control the pull-backpressure, via the
Request(int64)
method, that will intstruct the Publisher to emit specified amount of items. It also serves as a mean of emission cancellation via itsCancel()
method that will cause the Publisher to stop emitting and shut down. - Subscriber is an observer that will receive a Subscription object upon subscription. This is achieved via the Subscriber
OnSubscribe(Subscription)
method - Observer in the scope of this library means a Publisher that does not control its subscription.
- Scheduler and its only method
Schedule(func(Canceller)) Cancellable
is where everything is executed. It allows us to execute different things on different threads. The returned Cancelable can beCancel()
ed. When cancelled the Canceller's methodIsCanceled()
will return true. This is a small hindrance and is done like this because Go goroutines can be only cancelled from inside.
Observer itself does not support backpressure so we have two options, apply an operator that will request emissions from the Publisher automatically or we can control this behavior via the returned Subscription object. In this example we do the latter.
subscription := flux.FromSlice(
[]cesium.T{1, 2, 3},
).Map(func(t cesium.T) cesium.T {
return t.(int) + 1
}).Filter(func(t cesium.T) bool {
return t.(int) < 4
}).Subscribe(PrintObserver())
subscription.Request(1)
// Printsubscriber will asynchronously print 2
subscription.Request(1)
// Printsubscriber will asynchronously print 3
subscription.Request(1)
// Printsubscriber will receive Complete signal
// This subscriber provides a Request(int64) method so we can control it
s := ControlledSubscriber()
flux.FromSlice(
[]cesium.T{1, 2, 3},
).Map(func(t cesium.T) cesium.T {
return t.(int) + 1
}).Filter(func(t cesium.T) bool {
return t.(int) < 3
}).Subscribe(s) // s will receive Subscription
s.Request(2) // s will receive 2 emissions
subscription := flux.FromSlice(
[]cesium.T{1, 2, 3},
).Map(func(t cesium.T) cesium.T {
return t.(int) + 1
}).Filter(func(t cesium.T) bool {
return t.(int) < 4
}).Subscribe(PrintObserver())
subscription.RequestUnbounded() // Publisher will not wait for Request() to emit
Operators listed according to Reactor docs
- Just
- Mono.JustOrEmpty
- Mono.FromSupplier
- FromSlice
- FromChannel
- Mono.FromCallable
- Empty
- Never
- Error
- Defer
- Using
- Flux.Generate
- Create
- Interval
- Map(func(T) T)
- Cast
- FlatMap
- Handle(func(T, SynchronousSink))
- Flux.FlatMapSequential
- Mono.FlatMapMany
- Flux.ToSlice
- Maybe ToList (LinkedList would be better to handle large datasets)
- Flux.ToSortedSlice
- Flux.ToMap
- Flux.ToChannel
- Flux.Count()
- Flux.Reduce(func(T, T) T)
- Flux.Scan(func(T, T) T)
- Flux.All(func(T) bool)
- Flux.Any(func(T) bool)
- Flux.HasElements()
- Flux.HasElement(T) Flux
- Flux.Concat(Publisher) Flux
- ConcatWith(Publisher) Flux
- Flux.ConcatDelayError
- Flux.MergeSequential
- Flux.Merge
- MergeWith
- Zip
- ZipWith
- Mono.And
- Mono.When
- Flux.CombineLatest
- First (implement before Or)
- Or
- SwitchMap
- SwitchOnNext
- Repeat
- SwitchIfEmpty
- IgnoreElements
- Then
- ThenEmpty
- ThenMany
- Mono.DelayUntilOther
- Mono.DelayUntil
- Expand
- ExpandDeep
- DoOnNext(func(T))
- Flux.DoOnComplete
- Mono.DoOnSuccess
- DoOnError(func(error))
- DoOnCancel(func())
- DoOnSubscribe(func(Subscription))
- DoOnRequest
- DoOnTerminate
- DoAfterTerminate
- DoFinally(func())
- Log(log.Logger)
- DoOnEach
- Materialize
- Dematerialize
- Filter
- FilterWhen
- OfType
- Flux.Distinct
- Flux.DistinctUntilChanged
- Flux.Take
- Flux.TakeInPeriod
- Flux.Next
- Flux.LimitRequest
- Flux.TakeUntil
- Flux.TakeUntilOther
- Flux.TakeWhile
- Flux.ElementAt
- Flux.TakeLast
- Flux.Last
- Flux.LastOrDefault
- Flux.LastOrDefault
- Flux.Skip
- Flux.SkipPeriod
- Flux.SkipLast
- Flux.SkipUntil
- Flux.SkipUntilOther
- Flux.SkipWhile
- Flux.Sample
- Flux.SampleFirst
- Flux.SampleUsingOther
- Flux.SampleTimeout
- Flux.SingleOrDefault
- Flux.SingleOrEmpty
- Timeout
- OnErrorReturn
- OnErrorResume
- OnErrorMap
- Retry
- RetryWhen
- Flux.OnBackpressureError
- Flux.OnBackpressureBuffer
- Flux.OnBackpressureDrop
- Flux.OnBackpressureLatest
- Elapsed
- Timestamp
- Timeout
- Interval
- Mono.Delay
- Mono.DelayElement
- Flux.DelayElements
- DelaySubscription
- Flux.Window
- Flux.WindowPeriod
- Flux.WindowTimeout
- Flux.WindowUntil
- Flux.WindowWhile
- Flux.WindowUsingOther
- Flux.WindowWhen
- Flux.Buffer
- Flux.BufferPeriod
- Flux.BufferTimeout
- Flux.BufferUntil
- Flux.BufferWhile
- Flux.BufferWhen
- Flux.BufferUsingOther
- Flux.GroupBy
- Flux.BlockFirst
- Flux.BlockFirstTimeout
- Flux.BlockLast
- Flux.BlockLastTimeout
- Mono.Block
- Mono.BlockTimeout
- Add schedule periodic and schedule after to schedulers and add ability to insert virtual clock ( this will be useful in tests)
- How to split up tests for normal and scalar flux/mono?
- Fix locking for flatMaps
- Move most docs to godoc, except some examples and "how to choose an operator"
- NoneSignal() ?
- Performance benchmarks