Skip to content

reactivego/rx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rx

import "github.com/reactivego/rx"

Go Reference

Package rx delivers Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to handle data streams seamlessly.

Prerequisites

You’ll need Go 1.23 or later, as it includes support for generics and iterators.

Observables

In rx, an Observables represents a stream of data that can emit items over time, while an observer subscribes to it to receive and react to those emissions. This reactive approach enables asynchronous and concurrent operations without blocking execution. Instead of waiting for values to become available, an observer passively listens and responds whenever the Observable emits data, errors, or a completion signal.

This page introduces the reactive pattern, explaining what Observables and observers are and how subscriptions work. Other sections explore the powerful set of Observable operators that allow you to transform, combine, and control data streams efficiently.

An Observable:

  • is a stream of events.
  • assumes zero to many values over time.
  • pushes values
  • can take any amount of time to complete (or may never)
  • is cancellable
  • is lazy (it doesn't do anything until you subscribe).

Example

package main

import "github.com/reactivego/x"

func main() {
    x.From[any](1,"hi",2.3).Println()
}

Note the program creates a mixed type any observable from an int, string and a float64.

Output

1
hi
2.3

Example

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,2,3).Println()
}

Note the program uses inferred type int for the observable.

Output

1
2
3

Observables in x are somewhat similar to Go channels but have much richer semantics:

Observables can be hot or cold. A hot observable will try to emit values even when nobody is subscribed. Values emitted during that period will be lost. The position of a mouse pointer or the current time are examples of hot observables.

A cold observable will only start emitting values after somebody subscribes. The contents of a file or a database are examples of cold observables.

An observable can complete normally or with an error, it uses subscriptions that can be canceled from the subscriber side. Where a normal variable is just a place where you read and write values from, an observable captures how the value of this variable changes over time.

Concurrency flows naturally from the fact that an observable is an ever changing stream of values. Every Observable conceptually has at its core a concurrently running process that pushes out values.

Operators

Operators form a language in which programs featuring Observables can be expressed. They work on one or more Observables to transform, filter and combine them into new Observables.

Index

  • All determines whether all items emitted by an Observable meet some criteria.
  • AsObservable when called on an Observable source will type assert the 'any' items of the source to 'bar' items.
  • AsyncSubject emits the last value (and only the last value) emitted by the Observable part, and only after that Observable part completes.
  • AuditTime waits until the source emits and then starts a timer.
  • AutoConnect makes a Multicaster behave like an ordinary Observable that automatically connects the multicaster to its source when the specified number of observers have subscribed to it.
  • Average calculates the average of numbers emitted by an Observable and emits this average.
  • BehaviorSubject returns a new BehaviorSubject.
  • Buffer buffers the source Observable values until closingNotifier emits.
  • BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.
  • Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.
  • CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error.
  • CombineLatest will subscribe to all Observables.
  • CombineLatestAll flattens a higher order observable.
  • CombineLatestMap maps every entry emitted by the Observable into an Observable, and then subscribe to it, until the source observable completes.
  • CombineLatestMapTo maps every entry emitted by the Observable into a single Observable, and then subscribe to it, until the source observable completes.
  • CombineLatestWith will subscribe to its Observable and all other Observables passed in.
  • Concat emits the emissions from two or more observables without interleaving them.
  • ConcatAll flattens a higher order observable by concattenating the observables it emits.
  • ConcatMap transforms the items emitted by an Observable by applying a function to each item and returning an Observable.
  • ConcatMapTo maps every entry emitted by the Observable into a single Observable.
  • ConcatWith emits the emissions from two or more observables without interleaving them.
  • Connect instructs a connectable Observable to begin emitting items to its subscribers.
  • Count counts the number of items emitted by the source ObservableInt and emits only this value.
  • Create provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • CreateFutureRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • CreateRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • DebounceTime only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item.
  • Defer does not create the Observable until the observer subscribes.
  • Delay shifts the emission from an Observable forward in time by a particular amount of time.
  • Distinct suppress duplicate items emitted by an Observable.
  • DistinctUntilChanged only emits when the current value is different from the last.
  • Do calls a function for each next value passing through the observable.
  • DoOnComplete calls a function when the stream completes.
  • DoOnError calls a function for any error on the stream.
  • ElementAt emit only item n emitted by an Observable.
  • Empty creates an Observable that emits no items but terminates normally.
  • Filter emits only those items from an observable that pass a predicate test.
  • Finally applies a function for any error or completion on the stream.
  • First emits only the first item, or the first item that meets a condition, from an Observable.
  • From creates an observable from multiple values passed in.
  • FromChan creates an Observable from a Go channel.
  • IgnoreCompletion only emits items and never completes, neither with Error nor with Complete.
  • IgnoreElements does not emit any items from an Observable but mirrors its termination notification.
  • Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time interval.
  • Just creates an observable that emits a particular item.
  • Last emits only the last item emitted by an Observable.
  • Map transforms the items emitted by an Observable by applying a function to each item.
  • MapTo transforms the items emitted by an Observable.
  • Max determines, and emits, the maximum-valued item emitted by an Observable.
  • Merge combines multiple Observables into one by merging their emissions.
  • MergeAll flattens a higher order observable by merging the observables it emits.
  • MergeDelayError combines multiple Observables into one by merging their emissions.
  • MergeDelayErrorWith combines multiple Observables into one by merging their emissions.
  • MergeMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
  • MergeMapTo maps every entry emitted by the Observable into a single Observable.
  • MergeWith combines multiple Observables into one by merging their emissions.
  • Min determines, and emits, the minimum-valued item emitted by an Observable.
  • Never creates an Observable that emits no items and does't terminate.
  • ObserveOn specifies a schedule function to use for delivering values to the observer.
  • ObserverObservable actually is an observer that is made observable.
  • Of emits a variable amount of values in a sequence and then emits a complete notification.
  • Only filters the value stream of an observable and lets only the values of a specific type pass.
  • Passthrough just passes through all output from the Observable.
  • Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error.
  • Publish returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable.
  • PublishBehavior returns a Multicaster that shares a single subscription to the underlying Observable returning an initial value or the last value emitted by the underlying Observable.
  • PublishLast returns a Multicaster that shares a single subscription to the underlying Observable containing only the last value emitted before it completes.
  • PublishReplay returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable.
  • Range creates an Observable that emits a range of sequential int values.
  • Reduce applies a reducer function to each item emitted by an Observable and the previous reducer result.
  • RefCount makes a Connectable behave like an ordinary Observable.
  • Repeat creates an observable that emits a sequence of items repeatedly.
  • ReplaySubject ensures that all observers see the same sequence of emitted items, even if they subscribe after.
  • Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it will complete without error.
  • SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
  • Scan applies a accumulator function to each item emitted by an Observable and the previous accumulator result.
  • Serialize forces an observable to make serialized calls and to be well-behaved.
  • Single enforces that the observable sends exactly one data item and then completes.
  • Skip suppresses the first n items emitted by an Observable.
  • SkipLast suppresses the last n items emitted by an Observable.
  • Start creates an Observable that emits the return value of a function.
  • StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
  • Subject is a combination of an observer and observable.
  • Subscribe operates upon the emissions and notifications from an Observable.
  • SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.
  • Sum calculates the sum of numbers emitted by an Observable and emits this sum.
  • SwitchAll converts an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
  • SwitchMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
  • Take emits only the first n items emitted by an Observable.
  • TakeLast emits only the last n items emitted by an Observable.
  • TakeUntil emits items emitted by an Observable until another Observable emits an item.
  • TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.
  • ThrottleTime emits when the source emits and then starts a timer during which all emissions from the source are ignored.
  • Throw creates an observable that emits no items and terminates with an error.
  • Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed.
  • TimeInterval intercepts the items from the source Observable and emits in their place a struct that indicates the amount of time that elapsed between pairs of emissions.
  • Timeout mirrors the source Observable, but issue an error notification if a particular period of time elapses without any emitted items.
  • Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed.
  • Timestamp attaches a timestamp to each item emitted by an observable indicating when it was emitted.
  • ToChan returns a channel that emits 'any' values.
  • ToSingle blocks until the Observable emits exactly one value or an error.
  • ToSlice collects all values from the Observable into an slice.
  • Wait subscribes to the Observable and waits for completion or error.
  • WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice.
  • WithLatestFromAll flattens a higher order observable.