Version 1.0.6 – Feburary 11th 2015 (Maven Central)
This release adds an experimental operator and fixes several bugs.
Note that this API may still change or be removed altogether since it is marked as
@Beta
.
A flatMap
overload was added that allows limiting concurrency, or the number of Observable
s being merged .
This now means that these are the same, one using merge
directly, the other using flatMap
and passing in 10
as the maxConcurrent
:
Observable<Observable<Integer>> asyncWork = range(1, 1000000)
.doOnNext(i -> System.out.println("Emitted Value: " + i))
.map(item -> {
return just(item)
.doOnNext(MergeMaxConcurrent::sleep)
.subscribeOn(Schedulers.io());
});
merge(asyncWork, 10).toBlocking().forEach(v -> System.out.println("Received: " + v));
range(1, 1000000)
.doOnNext(i -> System.out.println("Emitted Value: " + i))
.flatMap(item -> {
return just(item)
.doOnNext(MergeMaxConcurrent::sleep)
.subscribeOn(Schedulers.io());
}, 10)
.toBlocking().forEach(v -> System.out.println("Received: " + v));
- [Pull 2627] (ReactiveX#2627) FlatMap overloads with maximum concurrency parameter
- [Pull 2648] (ReactiveX#2648) TakeWhile: don't unsubscribe downstream.
- [Pull 2580] (ReactiveX#2580) Allow configuring the maximum number of computation scheduler threads
- [Pull 2601] (ReactiveX#2601) Added common Exceptions.throwIfAny to throw a collection of exceptions
- [Pull 2644] (ReactiveX#2644) Missing Unsafe class yields NoClassDefFoundError
- [Pull 2642] (ReactiveX#2642) Fix a potential memory leak in schedulePeriodically
- [Pull 2630] (ReactiveX#2630) Cast back Observer to Subscriber if passed to subscribe(Observer)
- [Pull 2622] (ReactiveX#2622) Changed Observable.empty() into a stateless constant observable.
- [Pull 2607] (ReactiveX#2607) OnSubscribeRefCount - improve comments
Version 1.0.5 – Feburary 3rd 2015 (Maven Central)
This release includes many bug fixes along with a few new operators and enhancements.
This release adds a few experimental operators.
Note that these APIs may still change or be removed altogether since they are marked as @Experimental
.
This operator allows conditionally unsubscribing an Observable
but inclusively emitting the final onNext
. This differs from takeWhile
which excludes the final onNext
.
// takeUntil(predicate) example
Observable.just(1, 2, 3, 4, 5, 6, 7)
.doOnEach(System.out::println)
.takeUntil(i -> i == 3)
.forEach(System.out::println);
// takeWhile(predicate) example
Observable.just(1, 2, 3, 4, 5, 6, 7)
.doOnEach(System.out::println)
.takeWhile(i -> i <= 3)
.forEach(System.out::println);
This outputs:
// takeUntil(predicate)
[rx.Notification@30e84925 OnNext 1]
1
[rx.Notification@30e84926 OnNext 2]
2
[rx.Notification@30e84927 OnNext 3]
3
// takeWhile(predicate)
[rx.Notification@30e84925 OnNext 1]
1
[rx.Notification@30e84926 OnNext 2]
2
[rx.Notification@30e84927 OnNext 3]
3
[rx.Notification@30e84928 OnNext 4]
Note how takeWhile
produces 4 values and takeUntil
produces 3.
The new switchIfEmpty
operator is a companion to defaultIfEmpty
that switches to a different Observable
if the primary Observable
is empty.
Observable.empty()
.switchIfEmpty(Observable.just(1, 2, 3))
.forEach(System.out::println);
This release adds backpressure to merge(maxConcurrent)
so that horizontal buffer bloat can also be controll with the maxConcurrent
parameter.
This allows parallel execution such as the following to work with backpressure:
public class MergeMaxConcurrent {
public static void main(String[] args) {
// define 1,000,000 async tasks
Observable<Observable<Integer>> asyncWork = range(1, 1000000)
.doOnNext(i -> System.out.println("Value: " + i))
.doOnRequest(r -> System.out.println("request1 " + r))
.map(item -> {
return just(item)
// simulate slow IO or computation
.doOnNext(MergeMaxConcurrent::sleep)
.subscribeOn(Schedulers.io());
})
.doOnRequest(r -> System.out.println("request2 " + r));
// allow 10 outstanding tasks at a time
merge(asyncWork, 10).toBlocking().forEach(System.out::println);
}
public static void sleep(int value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
In prior versions all 1,000,000 tasks are immediately emitted and queued for execution. As of this release it correctly allows 10 at a time to be emitted.
- [Pull 2493] (ReactiveX#2493) Experimental Operator TakeUntil with predicate
- [Pull 2585] (ReactiveX#2585) Experimental Operator: switchIfEmpty
- [Pull 2470] (ReactiveX#2470) Experimental Subject state information methods & bounded ReplaySubject termination
- [Pull 2540] (ReactiveX#2540) Merge with max concurrency now supports backpressure.
- [Pull 2332] (ReactiveX#2332) Operator retry test fix attempt
- [Pull 2244] (ReactiveX#2244) OperatorTakeLast add check for isUnsubscribed to fast path
- [Pull 2469] (ReactiveX#2469) Remove the execute permission from source files
- [Pull 2455] (ReactiveX#2455) Fix for #2191 - OperatorMulticast fails to unsubscribe from source
- [Pull 2474] (ReactiveX#2474) MergeTest.testConcurrency timeout to let other tests run
- [Pull 2335] (ReactiveX#2335) A set of stateless operators that don't need to be instantiated
- [Pull 2447] (ReactiveX#2447) Fail early if a null subscription is added to a CompositeSubscription.
- [Pull 2475] (ReactiveX#2475) SynchronousQueue.clone fix
- [Pull 2477] (ReactiveX#2477) Backpressure tests fix0121
- [Pull 2476] (ReactiveX#2476) Fixed off-by-one error and value-drop in the window operator.
- [Pull 2478] (ReactiveX#2478) RefCountAsync: adjusted time values as 1 ms is unreliable
- [Pull 2238] (ReactiveX#2238) Fix the bug that cache doesn't unsubscribe the source Observable when the source is terminated
- [Pull 1840] (ReactiveX#1840) Unsubscribe when thread is interrupted
- [Pull 2471] (ReactiveX#2471) Fixes NPEs reported in ReactiveX#1702 by synchronizing queue.
- [Pull 2482] (ReactiveX#2482) Merge: fixed hangs & missed scalar emissions
- [Pull 2547] (ReactiveX#2547) Warnings cleanup
- [Pull 2465] (ReactiveX#2465) ScheduledExecutorService: call purge periodically on JDK 6 to avoid cancelled task-retention
- [Pull 2591] (ReactiveX#2591) Changed the naming of the NewThreadWorker's system parameters
- [Pull 2543] (ReactiveX#2543) OperatorMerge handle request overflow
- [Pull 2548] (ReactiveX#2548) Subscriber.request should throw exception if negative request made
- [Pull 2550] (ReactiveX#2550) Subscriber.onStart requests should be additive (and check for overflow)
- [Pull 2553] (ReactiveX#2553) RxRingBuffer with synchronization
- [Pull 2565] (ReactiveX#2565) Obstruction detection in tests.
- [Pull 2563] (ReactiveX#2563) Retry backpressure test: split error conditions into separate test lines.
- [Pull 2572] (ReactiveX#2572) Give more time to certain concurrency tests.
- [Pull 2559] (ReactiveX#2559) OnSubscribeFromIterable - add request overflow check
- [Pull 2574] (ReactiveX#2574) SizeEviction test needs to return false
- [Pull 2561] (ReactiveX#2561) Updating queue code from JCTools
- [Pull 2566] (ReactiveX#2566) CombineLatest: fixed concurrent requestUpTo yielding -1 requests
- [Pull 2552] (ReactiveX#2552) Publish: fixed incorrect subscriber requested accounting
- [Pull 2583] (ReactiveX#2583) Added perf tests for various container-like subscriptions
- [Pull 1955] (ReactiveX#1955) OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs
- [Pull 2590] (ReactiveX#2590) Zip: fixed unbounded downstream requesting above Long.MAX_VALUE
- [Pull 2589] (ReactiveX#2589) Repeat/retry: fixed unbounded downstream requesting above Long.MAX_VALUE
- [Pull 2567] (ReactiveX#2567) RefCount: disconnect all if upstream terminates
- [Pull 2593] (ReactiveX#2593) Zip: emit onCompleted without waiting for request + avoid re-reading fields
Version 1.0.4 – December 29th 2014 (Maven Central)
- [Pull 2156] (ReactiveX#2156) Fix the issue that map may swallow fatal exceptions
- [Pull 1967] (ReactiveX#1967) Fix the issue that GroupBy may not call 'unsubscribe'
- [Pull 2052] (ReactiveX#2052) OperatorDoOnRequest.ParentSubscriber should be static class
- [Pull 2237] (ReactiveX#2237) Make Publish Operator Release RingBuffer
- [Pull 2053] (ReactiveX#2053) Fixed wrong bounded ReplaySubject use in test
Version 1.0.3 – December 15th 2014 (Maven Central)
-
[Pull 1928] (ReactiveX#1928) Experimental: Add onBackpressureBuffer with capacity
-
[Pull 1946] (ReactiveX#1946) Experimental: AbstractOnSubscribe to help build Observables one onNext at a time.
-
[Pull 1960] (ReactiveX#1960) Beta: doOnRequest
-
[Pull 1965] (ReactiveX#1965) Fix the issue that Sample doesn't call 'unsubscribe'
-
[Pull 1966] (ReactiveX#1966) Fix NPE when the key is null in GroupBy
-
[Pull 1964] (ReactiveX#1964) Handle 0 or negative request in Buffer
-
[Pull 1957] (ReactiveX#1957) Fix 'request(0)' issue in Scan
-
[Pull 1950] (ReactiveX#1950) Add "Subscriptions.unsubscribed" to fix the 'isUnsubscribed' issue
-
[Pull 1938] (ReactiveX#1938) Any/All should not unsubscribe downstream.
-
[Pull 1968] (ReactiveX#1968) Upgrade to Gradle 2.2
-
[Pull 1961] (ReactiveX#1961) Remove Request Batching in Merge
-
[Pull 1953] (ReactiveX#1953) Fixed timer cast-to-int crash causing incorrect benchmark.
-
[Pull 1952] (ReactiveX#1952) Remove ActionSubscription
-
[Pull 1951] (ReactiveX#1951) Remove extraneous request(n) and onCompleted() calls when unsubscribed.
-
[Pull 1947] (ReactiveX#1947) Fixed first emission racing with pre and post subscription.
-
[Pull 1937] (ReactiveX#1937) Scheduler.Worker to be finally unsubscribed to avoid interference
-
[Pull 1926] (ReactiveX#1926) Move the codes out of the finally block
-
[Pull 1922] (ReactiveX#1922) Set removeOnCancelPolicy on the threadpool if supported
Version 1.0.2 – December 1st 2014 (Maven Central)
This release adds @Beta
and @Experimental
annotations to mark APIs that are not yet stable.
An example of how this looks in the Javadocs is:
@Experimental
public final Observable<T> onBackpressureBlock()
The lifecycle and stability of these are documented in the README as follows:
APIs marked with the
@Beta
annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your own control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).
APIs marked with the
@Experimental
annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.
- [Pull 1905] (ReactiveX#1905) Beta & Experimental Annotations
- [Pull 1907] (ReactiveX#1907) Experimental: onBackpressureBlock
- [Pull 1903] (ReactiveX#1903) Fix TestScheduler Handling of Immediate vs Virtual Time
- [Pull 1898] (ReactiveX#1898) Scheduled action no interrupt
- [Pull 1904] (ReactiveX#1904) Fix the bug that Scan may request 0 when n is 1
- [Pull 1912] (ReactiveX#1912) Fixed retry without backpressure & test function to support bp
Version 1.0.1 – November 28th 2014 (Maven Central)
- [Pull 1893] (ReactiveX#1893) Fixed incorrect error merging.
- [Pull 1901] (ReactiveX#1901) Fixed redo & groupBy backpressure management
Version 1.0.0 – November 18th 2014 (Maven Central)
After 2+ years of internal and open source development, 3600+ commits, 100+ releases, and with the help of 97 contributors RxJava has hit version 1.0.0.
Thank you @headinthebox @zsxwing @samuelgruetter @akarnokd @quidryan @DavidMGross @abersnaze @jmhofer @mairbek @mttkay @daveray @mattrjacobs @michaeldejong @MarioAriasC @johngmyers @pron @jbripley @davidmoten @gliptak @johnhmarks @jloisel @billyy @prabirshrestha @ragalie @abliss @dpsm @daschl @thegeez and the many other contributors and those who have reported bugs, tweeted, blogged or presented about RxJava.
The quality of this release could not have been achieved without all of your help. Thank you for your involvement and for building a community around this project.
As of 1.0 the JVM language adapters and other subprojects no longer live under RxJava but have been separated out into their own projects with their own release cycles:
- RxAndroid
- RxScala
- RxGroovy
- RxClojure
- RxKotlin
- RxJRuby
- RxJavaReactiveStreams
- RxJavaString
- RxJavaGuava
- RxRoboVM
- RxSwing
- RxQuasar
- RxJavaJoins
- RxJavaComputationExpressions
- RxJavaAsyncUtil
- RxJavaDebug
- RxJavaMath
- RxJavaFX
- RxApacheHttp
Version 1.x is now a stable API and will be supported for several years.
Minor 1.x increments (such as 1.1, 1.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.
Patch 1.x.y increments (such as 1.0.0 -> 1.0.1, 1.3.1 -> 1.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload).
- 1.0.x milestone with known issues
- 1.1 milestone with additional support for reactive pull backpressure
- 1.x is a catch all for other items that may be pursued in 1.2, 1.3 and later versions.
- all deprecated methods and types from v0.20 and earlier are deleted
- now published to groupId
io.reactivex
instead ofcom.netflix.rxjava
- artifactId is now
rxjava
instead ofrxjava-core
io.reactivex:rxjava:1.0.0
Following are specific changes from 0.20 to 1.0 to be aware of:
The groupByUntil
operator was removed by collapsing its behavior into groupBy
. Previously on groupBy
when a child GroupedObservable
was unsubscribed it would internally retain the state and ignore all future onNext
for that key.
This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using groupBy
on long-lived streams actually wanted the behavior of groupByUntil
where an unsubscribed GroupedObservable
would clean up the resources and then if onNext
for that key arrived again a new GroupedObservable
would be emitted.
Adding backpressure (reactive pull) to groupByUntil
was found to not work easily with its signatures so before 1.0 Final it was decided to collapse groupBy
and groupByUntil
. Further details on this can be found in Pull Request 1727.
Here is an example of how groupBy
now behaves when a child GroupedObservable
is unsubscribed (using take
here):
// odd/even into lists of 10
Observable.range(1, 100)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
}).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]
Previously this would have only emitted 2 groups and ignored all subsequent values:
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
On a finite stream, similar behavior of the previous groupBy
implementation that would filter can be achieved like this:
//odd/even into lists of 10
Observable.range(1, 100)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.filter(i -> i <= 20).toList();
}).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
That however does allow the stream to complete (which may not be wanted).
To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
}).take(2).toBlocking().forEach(System.out::println);
or
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.take(20)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.toList();
}).toBlocking().forEach(System.out::println);
These show that now groupBy
composes like any other operator without the nuanced and hidden behavior of ignoring values after a child GroupedObservable
is unsubscribed.
Uses of groupByUntil
can now all be done by just using operators like take
, takeWhile
and takeUntil
on the GroupedObservable
directly, such as this:
Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
.groupBy(n -> n)
.flatMap(g -> {
return g.take(3).reduce((s, s2) -> s + s2);
}).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc
The retryWhen
and repeatWhen
method signatures both emitted a Observable<Notification>
type which could be queried to represent either onError
in the retryWhen
case or onCompleted
in the repeatWhen
case. This was found to be confusing and unnecessary. The signatures were changed to emit Observable<Throwable>
for retryWhen
and Observable<Void>
for repeatWhen
to better signal the type of notification they are emitting without the need to then query the Notification
.
The following contrived examples shows how the Observable<Throwable>
is used to get the error that occurred when deciding to retry:
AtomicInteger count = new AtomicInteger();
Observable.create((Subscriber<? super String> s) -> {
if (count.getAndIncrement() == 0) {
s.onError(new RuntimeException("always fails"));
} else {
s.onError(new IllegalArgumentException("user error"));
}
}).retryWhen(attempts -> {
return attempts.flatMap(throwable -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("don't retry on IllegalArgumentException... allow failure");
return Observable.error(throwable);
} else {
System.out.println(throwable + " => retry after 1 second");
return Observable.timer(1, TimeUnit.SECONDS);
}
});
})
.toBlocking().forEach(System.out::println);
The collect
operator was changed to require a factory method for the initial value. This allows the Observable
to be executed multiple times and get a new value (typically a mutable data structure) each time. Prior to this the Observable
could only be subscribed to once since it would retain the original mutable data structure and keep mutating it.
Observable.range(0, 10).collect(() -> new ArrayList<Integer>(), (list, i) -> {
list.add(i);
}).forEach(System.out::println);
The Scheduler.parallelism
method was no longer being used by anything so was removed.
The parallel
operator was a failed experiment and almost all uses of it were wrong and led to confusion and often bad performance. Due to this it was removed.
Here is example code to show approaches to adding concurrency:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
System.out.println("------------ mergingSync");
mergingSync();
System.out.println("------------ mergingSyncMadeAsync");
mergingSyncMadeAsync();
System.out.println("------------ flatMapExampleSync");
flatMapExampleSync();
System.out.println("------------ flatMapExampleAsync");
flatMapExampleAsync();
System.out.println("------------ flatMapBufferedExampleAsync");
flatMapBufferedExampleAsync();
System.out.println("------------ flatMapWindowedExampleAsync");
flatMapWindowedExampleAsync();
System.out.println("------------");
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2))
.toBlocking().forEach(System.out::println);
}
/**
* Merging async Observables subscribes to all of them concurrently.
*/
private static void mergingSync() {
// here you'll see the delay as each is executed synchronously
Observable.merge(getDataSync(1), getDataSync(2))
.toBlocking().forEach(System.out::println);
}
/**
* If the Observables are synchronous they can be made async with `subscribeOn`
*/
private static void mergingSyncMadeAsync() {
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
Observable.merge(
getDataSync(1).subscribeOn(Schedulers.io()),
getDataSync(2).subscribeOn(Schedulers.io())
)
.toBlocking().forEach(System.out::println);
}
/**
* flatMap uses `merge` so any async Observables it returns will execute concurrently.
*/
private static void flatMapExampleAsync() {
Observable.range(0, 5).flatMap(i -> {
return getDataAsync(i);
}).toBlocking().forEach(System.out::println);
}
/**
* If synchronous Observables are merged (via flatMap here) then it will behave like `concat`
* and execute each Observable (getDataSync here) synchronously one after the other.
*/
private static void flatMapExampleSync() {
Observable.range(0, 5).flatMap(i -> {
return getDataSync(i);
}).toBlocking().forEach(System.out::println);
}
/**
* If a single stream needs to be split across multiple CPUs it is generally more efficient to do it in batches.
*
* The `buffer` operator can be used to batch into chunks that are then each processed on a separate thread.
*/
private static void flatMapBufferedExampleAsync() {
Observable.range(0, 5000).buffer(500).flatMap(i -> {
return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}).toBlocking().forEach(System.out::println);
}
/**
* Or the `window` operator can be used instead of buffer to process them as a stream instead of buffered list.
*/
private static void flatMapWindowedExampleAsync() {
Observable.range(0, 5000).window(500).flatMap(work -> {
return work.observeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}).toBlocking().forEach(System.out::println);
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
Similar to the parallel
operator parallelMerge
was confusing and was a failed experiment so was removed.
Removed publishLast
since it can be done with takeLast(1).publish()
.
Removed any method overload that took an initial value since the startWith
operator already allows that generically.
A handful of utility classes and methods that are not core to the purpose of RxJava were removed. This was preferred over having to constantly make arbitrary decisions about what utility methods should be included versus not while balancing the other goal of keeping the library small and focused.
The changes can be seen here and here
The Transformer
signature used by compose
was changed from:
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>>
to
public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>>
This was done after finding issues with the generics that prevented it from easily being used.
The multicast
operator was removed in favor of directly using publish()
, replay()
, share()
or cache()
.
This was done because multicast
could not be made to work with reactive pull backpressure due to its use of Subject
. The publish
and replay
operators on the other hand can have their implementations changed to not use a Subject
and then support backpressure. The publish()
operator has since been updated to support backpressure.
The takeWhileWithIndex
and skipWhileWithIndex
operators were removed since zipWith(Observable.range(Integer.MAX_VALUE))
can be used on any Observable
to get an index value. Any operator may want "withIndex" so it didn't make sense to have API clutter with some and certainly not all would have this variant added.
The longCount
operator was renamed to countLong
so it shows up alphabetically next to count
.
- [Pull 1886] (ReactiveX#1886) Buffer with time and merge fix
- [Pull 1882] (ReactiveX#1882) Remove Unused Scheduler.parallelism
- [Pull 1884] (ReactiveX#1884) Fix Scan/Reduce/Collect Factory Ambiguity
- [Pull 1866] (ReactiveX#1866) Fix memory leak in bounded ReplaySubject due to retaining the node index
- [Pull 1834] (ReactiveX#1834) Subject.toSerialized
- [Pull 1832] (ReactiveX#1832) Fix Take Early Unsubscription Causing Interrupts
- [Pull 1835] (ReactiveX#1835) Scan/Reduce with Seed Factory
- [Pull 1836] (ReactiveX#1836) Reduce Ring Buffer Default Sizes (and lower for Android)
- [Pull 1833] (ReactiveX#1833) Fix Thread Safety for Unsubscribe of Window
- [Pull 1827] (ReactiveX#1827) CacheThreadScheduler Evictor should Check Removal
- [Pull 1830] (ReactiveX#1830) Fix mergeDelayError Handling of Error in Parent Observable
- [Pull 1829] (ReactiveX#1829) Fix Window by Count Unsubscribe Behavior
- [Pull 1788] (ReactiveX#1788) Remove PublishLast/InitialValue
- [Pull 1796] (ReactiveX#1796) Improve TestSubject Javadoc
- [Pull 1803] (ReactiveX#1803) Print full classname (inner class support) and fix enum output
- [Pull 1802] (ReactiveX#1802) add hasObservers method to Subjects
- [Pull 1806] (ReactiveX#1806) Remove Unnecessary Utilities
- [Pull 1809] (ReactiveX#1809) Remove Utility Functions from Public API
- [Pull 1813] (ReactiveX#1813) Fix issue #1812 that zip may swallow requests
- [Pull 1817] (ReactiveX#1817) Fix Synchronous OnSubscribe Exception Skips Operators
- [Pull 1819] (ReactiveX#1819) Fix Concat Breaks with Double onCompleted
- [Pull 1771] (ReactiveX#1771) On error return backpressure
- [Pull 1776] (ReactiveX#1776) Observable.compose Generics
- [Pull 1778] (ReactiveX#1778) Change Transformer to Func1<Observable, Observable>
- [Pull 1775] (ReactiveX#1775) BlockingOperatorNextTest.testSingleSourceManyIterators fix
- [Pull 1784] (ReactiveX#1784) Publish with Backpressure
- [Pull 1786] (ReactiveX#1786) Remove Multicast
- [Pull 1787] (ReactiveX#1787) Remove *withIndex Operators
- [Pull 1789] (ReactiveX#1789) GroupedObservable.from/create
- [Pull 1793] (ReactiveX#1793) Take/Redo Unsubscribe
- [Pull 1767] (ReactiveX#1767) ExecutorScheduler delivers uncaught exceptions
- [Pull 1765] (ReactiveX#1765) backpressure support in onErrorResumeNext* operators
- [Pull 1766] (ReactiveX#1766) Unhandled errors go to UncaughtExceptionHandler
- [Pull 1755] (ReactiveX#1755) OnSubscribeRefCount with Synchronous Support
- [Pull 1750] (ReactiveX#1750) Fix NPE when iterable is null
- [Pull 1745] (ReactiveX#1745) SerializedSubject
- [Pull 1746] (ReactiveX#1746) Fatal System.err Logs on Unhandled Exceptions
- [Pull 1743] (ReactiveX#1743) Subject Error Handling
- [Pull 1742] (ReactiveX#1742) EmptyObserver and TestObserver
- [Pull 1740] (ReactiveX#1740) longCount -> countLong
- [Pull 1736] (ReactiveX#1736) Fix TrampolineScheduler NullPointerException
- [Pull 1738] (ReactiveX#1738) Delay Operator with Reactive Pull Backpressure
- [Pull 1739] (ReactiveX#1739) Fix Slow Non-deterministic Test
- [Pull 1731] (ReactiveX#1731) Remove Unused Code
- [Pull 1733] (ReactiveX#1733) Move To Proper Location
- [Pull 1747] (ReactiveX#1747) Cleanup: final and utility classes
- [Pull 1729] (ReactiveX#1729) CombineLatest: Request Up When Dropping Values
- [Pull 1728] (ReactiveX#1728) ObserveOn Error Propagation
- [Pull 1727] (ReactiveX#1727) Proposed groupBy/groupByUntil Changes
- [Pull 1726] (ReactiveX#1726) Fix Merge: backpressure + scalarValueQueue don't play nicely
- [Pull 1720] (ReactiveX#1720) Change repeatWhen and retryWhen signatures.
- [Pull 1719] (ReactiveX#1719) Fix Bug in the onBackpressure operators
- [Pull 1687] (ReactiveX#1687) Don't allocate an empty ArrayList for each Observable.empty call
- [Pull 1705] (ReactiveX#1705) Fix null-emitting combineLatest
- [Pull 1683] (ReactiveX#1683) ObserveOn Error Handling
- [Pull 1686] (ReactiveX#1686) Fix Rx serialization bug in takeUntil again and the concurrent issue in BufferUntilSubscriber
- [Pull 1701] (ReactiveX#1701) Fix the compose generics
- [Pull 1712] (ReactiveX#1712) Fixing regression in mergeDelayError
- [Pull 1716] (ReactiveX#1716) Remove Observable.Parallel
- [Pull 1667] (ReactiveX#1667) Fix the bug that Switch doesn't propagate 'unsubscribe'
- [Pull 1659] (ReactiveX#1659) OperatorScan should check for MAX_VALUE on request
- [Pull 1657] (ReactiveX#1657) Ignore furthur messages after entering terminate state
- [Pull 1669] (ReactiveX#1669) Error Handling Unsubscribe and Terminal State
- [Pull 1656] (ReactiveX#1656) Make TakeUntil obey Rx serialization contract
- [Pull 1652] (ReactiveX#1652) Operator Scan Backpressure Fix
- [Pull 1645] (ReactiveX#1645) Remove ParallelMerge
Version 1.0.0-rc.12 – November 17th 2014 (Maven Central)
- [Pull 1886] (ReactiveX#1886) Buffer with time and merge fix
Version 1.0.0-rc.11 – November 15th 2014 (Maven Central)
- [Pull 1882] (ReactiveX#1882) Remove Unused Scheduler.parallelism
- [Pull 1884] (ReactiveX#1884) Fix Scan/Reduce/Collect Factory Ambiguity
- [Pull 1866] (ReactiveX#1866) Fix memory leak in bounded ReplaySubject due to retaining the node index
Version 0.20.7 – November 11th 2014 (Maven Central)
- [Pull 1863] (ReactiveX#1863) Fix Concat Breaks with Double onCompleted
Version 1.0.0-rc.10 – November 8th 2014 (Maven Central)
- [Pull 1834] (ReactiveX#1834) Subject.toSerialized
- [Pull 1832] (ReactiveX#1832) Fix Take Early Unsubscription Causing Interrupts
- [Pull 1835] (ReactiveX#1835) Scan/Reduce with Seed Factory
- [Pull 1836] (ReactiveX#1836) Reduce Ring Buffer Default Sizes (and lower for Android)
- [Pull 1833] (ReactiveX#1833) Fix Thread Safety for Unsubscribe of Window
- [Pull 1827] (ReactiveX#1827) CacheThreadScheduler Evictor should Check Removal
- [Pull 1830] (ReactiveX#1830) Fix mergeDelayError Handling of Error in Parent Observable
- [Pull 1829] (ReactiveX#1829) Fix Window by Count Unsubscribe Behavior
Version 1.0.0-rc.9 – November 2nd 2014 (Maven Central)
- [Pull 1788] (ReactiveX#1788) Remove PublishLast/InitialValue
- [Pull 1796] (ReactiveX#1796) Improve TestSubject Javadoc
- [Pull 1803] (ReactiveX#1803) Print full classname (inner class support) and fix enum output
- [Pull 1802] (ReactiveX#1802) add hasObservers method to Subjects
- [Pull 1806] (ReactiveX#1806) Remove Unnecessary Utilities
- [Pull 1809] (ReactiveX#1809) Remove Utility Functions from Public API
- [Pull 1813] (ReactiveX#1813) Fix issue #1812 that zip may swallow requests
- [Pull 1817] (ReactiveX#1817) Fix Synchronous OnSubscribe Exception Skips Operators
- [Pull 1819] (ReactiveX#1819) Fix Concat Breaks with Double onCompleted
Version 1.0.0-rc.8 – October 23rd 2014 (Maven Central)
- [Pull 1771] (ReactiveX#1771) On error return backpressure
- [Pull 1776] (ReactiveX#1776) Observable.compose Generics
- [Pull 1778] (ReactiveX#1778) Change Transformer to Func1<Observable, Observable>
- [Pull 1775] (ReactiveX#1775) BlockingOperatorNextTest.testSingleSourceManyIterators fix
- [Pull 1784] (ReactiveX#1784) Publish with Backpressure
- [Pull 1786] (ReactiveX#1786) Remove Multicast
- [Pull 1787] (ReactiveX#1787) Remove *withIndex Operators
- [Pull 1789] (ReactiveX#1789) GroupedObservable.from/create
- [Pull 1793] (ReactiveX#1793) Take/Redo Unsubscribe
Version 1.0.0-rc.7 – October 16th 2014 (Maven Central)
- [Pull 1767] (ReactiveX#1767) ExecutorScheduler delivers uncaught exceptions
- [Pull 1765] (ReactiveX#1765) backpressure support in onErrorResumeNext* operators
- [Pull 1766] (ReactiveX#1766) Unhandled errors go to UncaughtExceptionHandler
- [Pull 1755] (ReactiveX#1755) OnSubscribeRefCount with Synchronous Support
- [Pull 1750] (ReactiveX#1750) Fix NPE when iterable is null
Version 0.20.6 – October 15th 2014 (Maven Central)
- [Pull 1721] (ReactiveX#1721) Bug in the onBackpressure operators
- [Pull 1695] (ReactiveX#1695) rewrite OnSubscribeRefCount to handle synchronous source
- [Pull 1761] (ReactiveX#1761) Fix null-emitting combineLatest
Version 1.0.0-rc.6 – October 10th 2014 (Maven Central)
This release is primarily bug fixes along with API cleanup by renaming longCount
to countLong
to be alphabetically sorted alongside count
.
- [Pull 1745] (ReactiveX#1745) SerializedSubject
- [Pull 1746] (ReactiveX#1746) Fatal System.err Logs on Unhandled Exceptions
- [Pull 1743] (ReactiveX#1743) Subject Error Handling
- [Pull 1742] (ReactiveX#1742) EmptyObserver and TestObserver
- [Pull 1740] (ReactiveX#1740) longCount -> countLong
- [Pull 1736] (ReactiveX#1736) Fix TrampolineScheduler NullPointerException
- [Pull 1738] (ReactiveX#1738) Delay Operator with Reactive Pull Backpressure
- [Pull 1739] (ReactiveX#1739) Fix Slow Non-deterministic Test
- [Pull 1731] (ReactiveX#1731) Remove Unused Code
- [Pull 1733] (ReactiveX#1733) Move To Proper Location
- [Pull 1747] (ReactiveX#1747) Cleanup: final and utility classes
Version 1.0.0-rc.5 – October 6th 2014 (Maven Central)
- [Pull 1729] (ReactiveX#1729) CombineLatest: Request Up When Dropping Values
- [Pull 1728] (ReactiveX#1728) ObserveOn Error Propagation
- [Pull 1727] (ReactiveX#1727) Proposed groupBy/groupByUntil Changes
- [Pull 1726] (ReactiveX#1726) Fix Merge: backpressure + scalarValueQueue don't play nicely
- [Pull 1720] (ReactiveX#1720) Change repeatWhen and retryWhen signatures.
- [Pull 1719] (ReactiveX#1719) Fix Bug in the onBackpressure operators
The groupByUntil
operator was removed by collapsing its behavior into groupBy
. Previously on groupBy
when a child GroupedObservable
was unsubscribed it would internally retain the state and ignore all future onNext
for that key.
This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using groupBy
on long-lived streams actually wanted the behavior of groupByUntil
where an unsubscribed GroupedObservable
would clean up the resources and then if onNext
for that key arrived again a new GroupedObservable
would be emitted.
Adding backpressure (reactive pull) to groupByUntil
was found to not work easily with its signatures so before 1.0 Final it was decided to collapse groupBy
and groupByUntil
. Further details on this can be found in Pull Request 1727.
Here is an example of how groupBy
now behaves when a child GroupedObservable
is unsubscribed (using take
here):
// odd/even into lists of 10
Observable.range(1, 100)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
}).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]
Previously this would have only emitted 2 groups and ignored all subsequent values:
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
On a finite stream, similar behavior of the previous groupBy
implementation that would filter can be achieved like this:
//odd/even into lists of 10
Observable.range(1, 100)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.filter(i -> i <= 20).toList();
}).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
That however does allow the stream to complete (which may not be wanted).
To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
}).take(2).toBlocking().forEach(System.out::println);
or
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.take(20)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.toList();
}).toBlocking().forEach(System.out::println);
These show that now groupBy
composes like any other operator without the nuanced and hidden behavior of ignoring values after a child GroupedObservable
is unsubscribed.
Uses of groupByUntil
can now all be done by just using operators like take
, takeWhile
and takeUntil
on the GroupedObservable
directly, such as this:
Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
.groupBy(n -> n)
.flatMap(g -> {
return g.take(3).reduce((s, s2) -> s + s2);
}).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc
The retryWhen
and repeatWhen
method signatures both emitted a Observable<Notification>
type which could be queried to represent either onError
in the retryWhen
case or onCompleted
in the repeatWhen
case. This was found to be confusing and unnecessary. The signatures were changed to emit Observable<Throwable>
for retryWhen
and Observable<Void>
for repeatWhen
to better signal the type of notification they are emitting without the need to then query the Notification
.
The following contrived examples shows how the Observable<Throwable>
is used to get the error that occurred when deciding to retry:
AtomicInteger count = new AtomicInteger();
Observable.create((Subscriber<? super String> s) -> {
if (count.getAndIncrement() == 0) {
s.onError(new RuntimeException("always fails"));
} else {
s.onError(new IllegalArgumentException("user error"));
}
}).retryWhen(attempts -> {
return attempts.flatMap(throwable -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("don't retry on IllegalArgumentException... allow failure");
return Observable.error(throwable);
} else {
System.out.println(throwable + " => retry after 1 second");
return Observable.timer(1, TimeUnit.SECONDS);
}
});
})
.toBlocking().forEach(System.out::println);
Version 1.0.0-rc.4 – October 2nd 2014 (Maven Central)
- [Pull 1687] (ReactiveX#1687) Don't allocate an empty ArrayList for each Observable.empty call
- [Pull 1705] (ReactiveX#1705) Fix null-emitting combineLatest
- [Pull 1683] (ReactiveX#1683) ObserveOn Error Handling
- [Pull 1686] (ReactiveX#1686) Fix Rx serialization bug in takeUntil again and the concurrent issue in BufferUntilSubscriber
- [Pull 1701] (ReactiveX#1701) Fix the compose generics
- [Pull 1712] (ReactiveX#1712) Fixing regression in mergeDelayError
- [Pull 1716] (ReactiveX#1716) Remove Observable.Parallel
Version 0.20.5 – October 2nd 2014 (Maven Central)
- [Pull 1686] (ReactiveX#1686) Fix Rx serialization bug in takeUntil again and the concurrent issue in BufferUntilSubscriber
- [Pull 1701] (ReactiveX#1701) Fix the compose generics
- [Pull 1712] (ReactiveX#1712) Fixing regression in mergeDelayError
- [Pull 1715] (ReactiveX#1715) Deprecate Observable.Parallel
Version 1.0.0-rc.3 (Maven Central)
- Merging fixes from 0.20.4 into 1.x branch
Version 0.20.4 (Maven Central)
- [Pull 1667] (ReactiveX#1667) Fix the bug that Switch doesn't propagate 'unsubscribe'
- [Pull 1659] (ReactiveX#1659) OperatorScan should check for MAX_VALUE on request
- [Pull 1657] (ReactiveX#1657) Ignore furthur messages after entering terminate state
- [Pull 1669] (ReactiveX#1669) Error Handling Unsubscribe and Terminal State
- [Pull 1656] (ReactiveX#1656) Make TakeUntil obey Rx serialization contract
- [Pull 1664] (ReactiveX#1664) StringObservable.split NPE fixes
Version 1.0.0-rc.2 (Maven Central)
- [Pull 1652] (ReactiveX#1652) Operator Scan Backpressure Fix
- [Pull 1645] (ReactiveX#1645) Remove ParallelMerge
Version 0.20.3 (Maven Central)
- [Pull 1648] (ReactiveX#1648) Operator Scan Backpressure Fix
- [Pull 1651] (ReactiveX#1651) RxScala: Fix the problem that Subscriber.onStart isn't called
- [Pull 1641] (ReactiveX#1641) RxScala: Fix infinite recursive onStart call in Subscriber
- [Pull 1646] (ReactiveX#1646) Deprecate ParallelMerge
Version 1.0.0-rc.1 (Maven Central)
The first release candidate for 1.0.0.
This is the same code as version 0.20.2 except:
- all deprecated methods and types are deleted
- now published to groupId
io.reactivex
instead ofcom.netflix.rxjava
- artifactId is now
rxjava
instead ofrxjava-core
io.reactivex:rxjava:1.0.0-rc.1
- all sub-projects are separated into their own projects and no longer released along with RxJava
The artifacts can be found on maven Central at: http://repo1.maven.org/maven2/io/reactivex/rxjava/1.0.0-rc.1/
Version 0.20.2 (Maven Central)
- [Pull 1637] (ReactiveX#1637) Optimize single BlockingObservable operations
Version 0.20.1 (Maven Central)
- [Pull 1631] (ReactiveX#1631) Handle Fatal Exceptions in doOn* operators
- [Pull 1625] (ReactiveX#1625) RxScala: Mark superfluous from/empty methods with scheduler parameter as deprecated
- [Pull 1623] (ReactiveX#1623) RxScala: Add more operators to match RxJava
- [Pull 1632] (ReactiveX#1632) Composite Exception - Circular Reference Handling
Version 0.20.0 (Maven Central)
RxJava 0.20.0 is a major release that adds "reactive pull" support for backpressure along with several other enhancements leading into the 1.0 release.
Solutions for backpressure was the major focus of this release. A "reactive pull" implementation was implemented. Documentation on this and other options for backpressure are found in the wiki: https://github.com/ReactiveX/RxJava/wiki/Backpressure
The reactive pull solution evolved out of several prototypes and interaction with many people over the months.
A new type Producer
has been added:
public interface Producer {
public void request(long n);
}
The Subscriber
type now has these methods added:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
public void onStart();
protected final void request(long n);
public final void setProducer(Producer producer);
}
This trivial example shows requesting values one at a time:
Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
// on start this tells it to request 1
// otherwise it defaults to request(Long.MAX_VALUE)
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
System.out.println(t);
// as each onNext is consumed, request another
// otherwise the Producer will not send more
request(1);
}
});
The OnSubscribeFromIterable operator shows how an Iterable
is consumed with backpressure.
Some hi-lights (modified for simplicity rather than performance and completeness):
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
// instead of emitting directly to the Subscriber, it emits a Producer
o.setProducer(new IterableProducer<T>(o, it));
}
private static final class IterableProducer<T> implements Producer {
public void request(long n) {
int _c = requested.getAndAdd(n);
if (_c == 0) {
while (it.hasNext()) {
if (o.isUnsubscribed()) {
return;
}
T t = it.next();
o.onNext(t);
if (requested.decrementAndGet() == 0) {
// we're done emitting the number requested so return
return;
}
}
o.onCompleted();
}
}
}
}
The observeOn
operator is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure.
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
}
@Override
public void onNext(final T t) {
try {
// enqueue
queue.onNext(t);
} catch (MissingBackpressureException e) {
// fail if the upstream has not obeyed our backpressure requests
onError(e);
return;
}
// attempt to schedule draining if needed
schedule();
}
// the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining
}
Many use cases will be able to use Observable.from
, Observable.onBackpressureDrop
and Observable.onBackpressureBuffer
to achieve "reactive pull backpressure" without manually implementing Producer
logic. Also, it is optional to make an Observable
support backpressure. It can remain completely reactive and just push events as it always has. Most uses of RxJava this works just fine. If backpressure is needed then it can be migrated to use a Producer
or several other approaches to flow control exist such as throttle
, sample
, debounce
, window
, buffer
, onBackpressureBuffer
, and onBackpressureDrop
.
The wiki provides further documentation.
Contributors to RxJava are involved in defining the Reactive Streams spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava Observable
and the Reactive Stream types.
The reasons for this are:
- Rx has
Observer.onCompleted
whereas Reactive Streams hasonComplete
. This is a massive breaking change to remove a "d". - The RxJava
Subscription
is used also a "Closeable"/"Disposable" and it does not work well to make it now also be used forrequest(n)
, hence the separate typeProducer
in RxJava. It was attempted to reuserx.Subscription
but it couldn't be done without massive breaking changes. - Reactive Streams uses
onSubscribe(Subscription s)
whereas RxJava injects theSubscription
as theSubscriber
. Again, this change could not be done without major breaking changes. - RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap.
Considering these things, the major semantics of request(long n)
for backpressure are compatible and this will allow interop with a bridge between the interfaces.
The compose
operator is similar to lift
but allows custom operator implementations that are chaining Observable
operators whereas lift
is directly implementing the raw Subscriber
logic.
Here is a trival example demonstrating how using compose
is a better option than lift
when existing Observable
operators can be used to achieve the custom behavior.
import rx.Observable;
import rx.Observable.Operator;
import rx.Observable.Transformer;
import rx.Subscriber;
public class ComposeExample {
public static void main(String[] args) {
Observable.just("hello").compose(appendWorldTransformer()).forEach(System.out::println);
Observable.just("hello").lift(appendWorldOperator()).forEach(System.out::println);
}
// if existing operators can be used, compose with Transformer is ideal
private static Transformer<? super String, String> appendWorldTransformer() {
return o -> o.map(s -> s + " world!").finallyDo(() -> {
System.out.println(" some side-effect");
});
}
// whereas lift is more low level
private static Operator<? super String, String> appendWorldOperator() {
return new Operator<String, String>() {
@Override
public Subscriber<? super String> call(Subscriber<? super String> child) {
return new Subscriber<String>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(String t) {
child.onNext(t + " world!");
System.out.println(" some side-effect");
}
};
}
};
}
}
New operators retryWhen
and repeatWhen
were added which offer support for more advanced recursion such as retry with exponential backoff.
Here is an example that increases delay between each retry:
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
The use of Producer
has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source Observable
emits faster than the Observer
can consume them, a MissingBackpressureException
can be emitted via onError
.
This semantic change can break existing code.
There are two ways of resolving this:
- Modify the source
Observable
to useProducer
and support backpressure. - Use newly added operators such as
onBackpressureBuffer
oronBackpressureDrop
to choose a strategy for the sourceObservable
of how to behave when it emits more data than the consumingObserver
is capable of handling. Use ofonBackpressureBuffer
effectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier.
Example:
sourceObservable.onBackpressureBuffer().subscribe(slowConsumer);
Various methods, operators or classes have been deprecated and will be removed in 1.0. Primarily they have been done to remove ambiguity, remove nuanced functionality that is easy to use wrong, clear out superfluous methods and eliminate cruft that was added during the 0.x development process but has been replaced.
For example, Observable.from(T)
was deprecated in favor of Observable.just(T)
despite being a painful breaking change so as to solve ambiguity with Observable.from(Iterable)
.
This means that the upgrade from 0.20 to 1.0 will be breaking. This is being done so that the 1.x version can be a long-lived stable API built upon as clean a foundation as possible.
A stable API for RxJava is important because it is intended to be a foundational library that many projects will depend upon. The deprecations are intended to help this be achieved.
The next release will be 1.0 (after a few release candidates). The RxJava project has been split up into many new top-level projects at https://github.com/ReactiveX so each of their release cycles and version strategies can be decoupled.
The 1.x version is intended to be stable for many years and target Java 6, 7 and 8. The expected outcome is for a 2.x version to target Java 8+ but for RxJava 1.x and 2.x to co-exist and both be living, supported versions.
Version 0.20.0-RC6 (Maven Central)
Further fixes and enhancements bringing us close to completing 0.20.0 and almost ready for 1.0.
A more major change in this release is the deprecation of Observable.from(T)
. The full discussion can be seen in #1563.
- [Pull 1575] (ReactiveX#1575) combineLatest with backpressure
- [Pull 1569] (ReactiveX#1569) Compose/Transform Covariance
- [Pull 1577] (ReactiveX#1577) Fix the compose covariance
- [Pull 1581] (ReactiveX#1581) zip(Iterable) -> zipWith(Iterable)
- [Pull 1582] (ReactiveX#1582) Deprecate GroupedObservable.from
- [Pull 1583] (ReactiveX#1583) Redo/Repeat Backpressure
- [Pull 1576] (ReactiveX#1576) from(T) -> just(T)
- [Pull 1545] (ReactiveX#1545) Make Android ViewObservable.input observe TextView instead of String
Version 0.20.0-RC5 (Maven Central)
Version 0.20.0-RC5 updates parallel
, buffer(size)
, switchOnNext
, repeat
, and retry
to support "reactive pull" backpressure. It adds a groupBy
overload with an element selector, a new compose
method as an alternative to lift
for custom operators, fixes bugs and other general improvements.
There are still oustanding items being tracked for 0.20 that need to be completed for the final release.
- [Pull 1573] (ReactiveX#1573) Backpressure: parallel
- [Pull 1572] (ReactiveX#1572) Remove Timeout in Blocking Iterator
- [Pull 1570] (ReactiveX#1570) RxClojure: Fix for mapcat
- [Pull 1568] (ReactiveX#1568) Compose/Transformer
- [Pull 1567] (ReactiveX#1567) groupBy with element selector
- [Pull 1565] (ReactiveX#1565) Fixing Kotlin Defer
- [Pull 1507] (ReactiveX#1507) BufferWithSize with Backpressure Support
- [Pull 1557] (ReactiveX#1557) SwitchOnNext with backpressure support
- [Pull 1562] (ReactiveX#1562) TakeLastTimed with backpressure support
- [Pull 1564] (ReactiveX#1564) RxScala: Fix errors in Completeness.scala and also improve it
- [Pull 1548] (ReactiveX#1548) Adding backpressure to OnSubscribeRedo
- [Pull 1559] (ReactiveX#1559) More consistent hooks for scheduler plugins
- [Pull 1561] (ReactiveX#1561) Remove Variance on Defer
- [Pull 1537] (ReactiveX#1537) recursive scheduling in RxScala
- [Pull 1560] (ReactiveX#1560) flatMap overloads
- [Pull 1558] (ReactiveX#1558) mergeMap generics
- [Pull 1552] (ReactiveX#1552) Fixing a bug and a potential for other concurrency issues
- [Pull 1555] (ReactiveX#1555) RxScala: Add retryWhen/repeatWhen methods
Version 0.20.0-RC4 (Maven Central)
Version 0.20.0-RC4 continues bug fixes and completing work related to "reactive pull" backpressure. This release updates amb
and concat
to connect the backpressure request
.
Internal uses of RxRingBuffer
migrated to using SpmcArrayQueue
which significantly reduces object allocations. See #1526 for details.
The multicast
operators were updated to use a Subject
factory so that Observable
sequences can be reused. See #1515 for details.
- [Pull 1534] (ReactiveX#1534) Concat Backpressure
- [Pull 1533] (ReactiveX#1533) Amb + Backpressure
- [Pull 1527] (ReactiveX#1527) Failing unit test for reduce, showing it does not implement backpressure correctly
- [Pull 1528] (ReactiveX#1528) Add operators to create Observables from BroadcastReceiver (rebased)
- [Pull 1523] (ReactiveX#1523) Fix issue #1522: takeLast
- [Pull 1530] (ReactiveX#1530) Fix the unbounded check for merge
- [Pull 1526] (ReactiveX#1526) Restore use of SpmcArrayQueue in RxRingBuffer
- [Pull 1468] (ReactiveX#1468) RxScala: Update CompletenessTest.scala
- [Pull 1515] (ReactiveX#1515) Support Subject Factory with Multicast
- [Pull 1518] (ReactiveX#1518) Fix typos in javadoc comments
- [Pull 1521] (ReactiveX#1521) Fix toIterator Exception Handling
- [Pull 1520] (ReactiveX#1520) Fix non-deterministic RxRingBuffer test
Version 0.20.0-RC3 (Maven Central)
Version 0.20.0-RC3 preview release fixes several bugs related to backpressure and adds retryWhen
, repeatWhen
for more advanced recursion use cases like retry with exponential backoff.
This version passed the Netflix API production canary process. Please test this against your code to help us find any issues before we release 0.20.0.
- [Pull 1493] (ReactiveX#1493) retryWhen/repeatWhen
- [Pull 1494] (ReactiveX#1494) zipWith
- [Pull 1501] (ReactiveX#1501) blocking synchronous next
- [Pull 1498] (ReactiveX#1498) non-deterministic testUserSubscriberUsingRequestAsync
- [Pull 1497] (ReactiveX#1497) spsc ring buffer concurrency test
- [Pull 1496] (ReactiveX#1496) Change RxRingBuffer Queue Usage
- [Pull 1491] (ReactiveX#1491) Concat Outer Backpressure
- [Pull 1490] (ReactiveX#1490) non-deterministic timeouts on slow machines
- [Pull 1489] (ReactiveX#1489) Backpressure Fixes and Docs
- [Pull 1474] (ReactiveX#1474) Ignore backpressure for OperatorToObservableSortedList
- [Pull 1473] (ReactiveX#1473) OperatorAny needs to handle backpressure
- [Pull 1472] (ReactiveX#1472) Add test of backpressure to OperatorAll
- [Pull 1469] (ReactiveX#1469) ToList operator needs to ignore backpressure
- [Pull 1393] (ReactiveX#1393) Add cache(int capacity) to Observable
- [Pull 1431] (ReactiveX#1431) CompositeException fix for Android
- [Pull 1436] (ReactiveX#1436) Correct warnings
Version 0.20.0-RC2 (Maven Central)
Version 0.20.0-RC2 preview release adds support for backpressure to the zip
operators, fixes bugs and removes the Subscribe.onSetProducer
method.
This means signature changes are modified to be:
The new type Producer
->
public interface Producer {
public void request(long n);
}
New methods added to Subscriber
->
public abstract class Subscriber<T> implements Observer<T>, Subscription {
public void onStart();
protected final void request(long n);
public final void setProducer(Producer producer);
}
- [Pull 1448] (ReactiveX#1448) RxScala: Add Scala idiomatic methods
- [Pull 1446] (ReactiveX#1446) Zip with Backpressure Support
- [Pull 1454] (ReactiveX#1454) doOnEachObserver fix
- [Pull 1457] (ReactiveX#1457) MergeDelayError & OnErrorFlatMap w/ Merge
- [Pull 1458] (ReactiveX#1458) Remove Pivot Operator
- [Pull 1459] (ReactiveX#1459) Remove Subscriber.onSetProducer
- [Pull 1462] (ReactiveX#1462) Merge Perf Fix: Re-enable fast-path
- [Pull 1463] (ReactiveX#1463) Merge Bug: Missing Emissions
Version 0.20.0-RC1 (Maven Central)
Version 0.20.0-RC1 is a preview release that adds backpressure support to RxJava as per issue #1000. It has been done in a way that is mostly additive and most existing code will not be affected by these additions. A section below on "Breaking Changes" will discuss use cases that do break and how to deal with them.
This release has been tested successfully in Netflix production canaries, but that does not exercise all use cases or operators, nor does it leverage the newly added backpressure functionality (though the backpressure code paths are used).
- The
zip
operator has not yet been upgraded to support backpressure. The work is almost done and it will be included in the next release. - Not all operators have yet been reviewed for whether they need to be changed in any way.
- Temporal operators (like
buffer
,window
,sample
, etc) need to be modified to disable backpressure upstream (usingrequest(Long.MAX_VALUE)
) and a decision made about how downstream backpressure requests will be supported. - Ensure all code works on Android. New data structures rely on
sun.misc.Unsafe
but are conditionally used only when it is available. We need to ensure those conditions are working and the alternative implementations are adequate. The default buffer size of 1024 also needs to be reviewed for whether it is a correct default for all systems, or needs to be modified by environment (such as smaller for Android). - Ensure use cases needing backpressure all work.
A new type Producer
has been added:
public interface Producer {
public void request(long n);
}
The Subscriber
type now has these methods added:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
public void onStart();
public final void request(long n);
public final void setProducer(Producer producer);
protected Producer onSetProducer(Producer producer);
}
This trivial example shows requesting values one at a time:
Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
request(1);
}
});
The OnSubscribeFromIterable operator shows how an Iterable
is consumed with backpressure.
Some hi-lights (modified for simplicity rather than performance and completeness):
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
// instead of emitting directly to the Subscriber, it emits a Producer
o.setProducer(new IterableProducer<T>(o, it));
}
private static final class IterableProducer<T> implements Producer {
public void request(long n) {
int _c = requested.getAndAdd(n);
if (_c == 0) {
while (it.hasNext()) {
if (o.isUnsubscribed()) {
return;
}
T t = it.next();
o.onNext(t);
if (requested.decrementAndGet() == 0) {
// we're done emitting the number requested so return
return;
}
}
o.onCompleted();
}
}
}
}
The observeOn
operator is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure.
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
}
@Override
public void onNext(final T t) {
try {
// enqueue
queue.onNext(t);
} catch (MissingBackpressureException e) {
// fail if the upstream has not obeyed our backpressure requests
onError(e);
return;
}
// attempt to schedule draining if needed
schedule();
}
// the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining
}
The use of Producer
has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source Observable
emits faster than the Observer
can consume them, a MissingBackpressureException
can be emitted via onError
.
This semantic change can break existing code.
There are two ways of resolving this:
- Modify the source
Observable
to useProducer
and support backpressure. - Use newly added operators such as
onBackpressureBuffer
oronBackpressureDrop
to choose a strategy for the sourceObservable
of how to behave when it emits more data than the consumingObserver
is capable of handling. Use ofonBackpressureBuffer
effectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier.
Example:
sourceObservable.onBackpressureBuffer().subscribe(slowConsumer);
Contributors to RxJava are involved in defining the Reactive Streams spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava Observable
and the Reactive Stream types.
The reasons for this are:
- Rx has
Observer.onCompleted
whereas Reactive Streams hasonComplete
. This is a massive breaking change to remove a "d". - The RxJava
Subscription
is used also a "Closeable"/"Disposable" and it does not work well to make it now also be used forrequest(n)
, hence the separate typeProducer
in RxJava. It was attempted to reuserx.Subscription
but it couldn't be done without massive breaking changes. - Reactive Streams uses
onSubscribe(Subscription s)
whereas RxJava injects theSubscription
as theSubscriber
. Again, this change could not be done without major breaking changes. - RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap.
- Reactive Streams is not yet 1.0 and despite significant progress, it is a moving target.
Considering these things, the major semantics of request(long n)
for backpressure are compatible and this will allow interop with a bridge between the interfaces. As the Reactive Streams spec matures, RxJava 2.0 may choose to fully adopt the types in the future while RxJava 1.x retains the current signatures.
First, please test this release against your existing code to help us determine if we have broken anything.
Second, try to solve backpressure use cases and provide feedback on what works and what doesn't work.
Thank you!
Version 0.19.6 (Maven Central)
Inclusion of 'rxjava-contrib:rxjava-scalaz' in release.
Upload to Maven Central was corrupted so release is skipped.
Version 0.19.4 (Maven Central)
- [Pull 1401] (ReactiveX#1401) OnError while emitting onNext value: object.toString
- [Pull 1409] (ReactiveX#1409) Avoiding OperatorObserveOn from calling subscriber.onNext(..) after unsubscribe
- [Pull 1406] (ReactiveX#1406) Kotlin M8
- [Pull 1400] (ReactiveX#1400) Internal Data Structures
- [Pull 1399] (ReactiveX#1399) Update Perf Tests
- [Pull 1396] (ReactiveX#1396) RxScala: Fix the compiler warnings
- [Pull 1397] (ReactiveX#1397) Adding the hooks unsafeSubscribe
Upload to Maven Central was corrupted so release is skipped.
Version 0.19.2 (Maven Central)
- [Pull 1388] (ReactiveX#1388) CompositeException stops mutating nested Exceptions
- [Pull 1387] (ReactiveX#1387) Upgrade to JMH 0.9
- [Pull 1297] (ReactiveX#1297) [RxScala] rxjava-scalaz: providing some type class instances
- [Pull 1332] (ReactiveX#1332) IOSSchedulers for RoboVM
- [Pull 1380] (ReactiveX#1380) Variety of Fixes
- [Pull 1379] (ReactiveX#1379) Parallel Operator Rewrite
- [Pull 1378] (ReactiveX#1378) BugFix: Pivot Concurrency
- [Pull 1376] (ReactiveX#1376) Revision of JMH Tests
- [Pull 1375] (ReactiveX#1375) RxScala: Add idiomatic toXXX methods
- [Pull 1367] (ReactiveX#1367) Fix the bug that 'flatMap' swallows OnErrorNotImplementedException
- [Pull 1374] (ReactiveX#1374) Fix head/tail false sharing issues
- [Pull 1369] (ReactiveX#1369) DebugHook got miswired before
- [Pull 1361] (ReactiveX#1361) Fix a race condition if queued actions have been handled already
- [Pull 1336] (ReactiveX#1336) RxScala: Add the rest missing methods to BlockingObservable
- [Pull 1362] (ReactiveX#1362) RxScala: Fix #1340 and #1343
- [Pull 1359] (ReactiveX#1359) Fixed padding of the integer and node classes
Version 0.19.1 (Maven Central)
- [Pull 1357] (ReactiveX#1357) MergeWith, ConcatWith, AmbWith
- [Pull 1345] (ReactiveX#1345) RxScala: Simplify doOnCompleted/Terminate, finallyDo callback usage
- [Pull 1337] (ReactiveX#1337) Make Future receive NoSuchElementException when the BlockingObservable is empty
- [Pull 1335] (ReactiveX#1335) RxAndroid: Bump build tools to 19.1 and android plugin to 0.11
- [Pull 1327] (ReactiveX#1327) Join patterns extension for 4..9 and N arity joins.
- [Pull 1321] (ReactiveX#1321) RxAndroid: Ensuring Runnables posted with delay to a Handler are removed when unsubcribed
- [Pull 1347] (ReactiveX#1347) Allow use of the returned subscription to cancel periodic scheduling
- [Pull 1355] (ReactiveX#1355) Don't add the subscriber to the manager if it unsubscribed during the onStart call
- [Pull 1350] (ReactiveX#1350) Baseline Performance Tests
- [Pull 1316] (ReactiveX#1316) RxScala: Add the rest operators
- [Pull 1324] (ReactiveX#1324) TrampolineScheduler & Unsubscribe
- [Pull 1311] (ReactiveX#1311) Tiny integration test change
Version 0.19.0 (Maven Central)
Fairly significant object allocation improvements are included in this release which reduce GC pressure and improve performance.
Two pull requests (amongst several) with details are:
- ReactiveX#1281 Reduce Subscription Object Allocation
- ReactiveX#1246 Moved to atomic field updaters
With the following simple test code relative performance has increased as shown below:
Observable<Integer> o = Observable.just(1);
o.map(i -> {
return String.valueOf(i);
}).map(i -> {
return Integer.parseInt(i);
}).subscribe(observer);
Run: 10 - 10,692,099 ops/sec
Run: 11 - 10,617,627 ops/sec
Run: 12 - 10,938,405 ops/sec
Run: 13 - 10,917,388 ops/sec
Run: 14 - 10,783,298 ops/sec
Run: 11 - 8,493,506 ops/sec
Run: 12 - 8,403,361 ops/sec
Run: 13 - 8,400,537 ops/sec
Run: 14 - 8,163,998 ops/sec
Run: 10 - 4,930,966 ops/sec
Run: 11 - 6,119,951 ops/sec
Run: 12 - 7,062,146 ops/sec
Run: 13 - 6,514,657 ops/sec
Run: 14 - 6,369,426 ops/sec
Run: 10 - 2,879,355 ops/sec
Run: 11 - 3,236,245 ops/sec
Run: 12 - 4,468,275 ops/sec
Run: 13 - 3,237,293 ops/sec
Run: 14 - 4,683,840 ops/sec
Note that these numbers are relative as they depend on the JVM and hardware.
Many missing operators have been added to the RxScala APIs along with fixes and other maturation.
The toBlockingObservable()
method has been deprecated in favor of toBlocking()
for brevity and fit better with possible future additions such as toParallel()
without always needing the Observable
suffix.
forEach
as added as an alias for subscribe
to match the Java 8 naming convention.
This means code can now be written as:
Observable.from(1, 2, 3).limit(2).forEach(System.out::println);
which is an alias of this:
Observable.from(1, 2, 3).take(2).subscribe(System.out::println);
Since forEach
exists on BlockingObservable
as well, moving from non-blocking to blocking looks like this:
// non-blocking
Observable.from(1, 2, 3).limit(2).forEach(System.out::println);
// blocking
Observable.from(1, 2, 3).limit(2).toBlocking().forEach(System.out::println);
Thread caching is restored to Schedulers.io()
after being lost in v0.18.
A replacement for ExecutorScheduler
(removed in 0.18) is accessible via Schedulers.from(Executor e)
that wraps an Executor
and complies with the Rx contract.
All "replay" functionality now exists directly on the ReplaySubject
rather than in an internal type. This means there are now several different create
methods with the various overloads of size and time.
- Pull 1165 RxScala: Add dropUntil, contains, repeat, doOnTerminate, startWith, publish variants
- Pull 1183 NotificationLite.accept performance improvements
- Pull 1177 GroupByUntil to use BufferUntilSubscriber
- Pull 1182 Add facilities for creating Observables from JavaFX events and ObservableValues
- Pull 1188 RxScala Schedulers changes
- Pull 1175 Fixed synchronous ConnectableObservable.connect problem
- Pull 1172 ObserveOn: Change to batch dequeue
- Pull 1191 Fix attempt for OperatorPivotTest
- Pull 1195 SwingScheduler: allow negative schedule
- Pull 1178 Fix RxScala bug
- Pull 1210 Add more operators to RxScala
- Pull 1216 RxScala: Exposing PublishSubject
- Pull 1208 OperatorToObservableList: use LinkedList to buffer the sequence’s items
- Pull 1185 Behavior subject time gap fix 2
- Pull 1226 Fix bug in
zipWithIndex
and setzip(that, selector)
public in RxScala - Pull 1224 Implement shorter toBlocking as shorter alias for toBlockingObservable.
- Pull 1223 ReplaySubject enhancement with time and/or size bounds
- Pull 1160 Add
replay
andmulticast
variants to RxScala - Pull 1229 Remove Ambiguous Subscribe Overloads with Scheduler
- Pull 1232 Adopt Limit and ForEach Java 8 Naming Conventions
- Pull 1233 Deprecate toBlockingObservable in favor of toBlocking
- Pull 1237 SafeSubscriber memory reduction
- Pull 1236 CompositeSubscription with atomic field updater
- Pull 1243 Remove Subscription Wrapper from Observable.subscribe
- Pull 1244 Observable.from(T) using Observable.just(T)
- Pull 1239 RxScala: Update docs for "apply" and add an example
- Pull 1248 Fixed testConcurrentOnNextFailsValidation
- Pull 1246 Moved to atomic field updaters.
- Pull 1254 ZipIterable unsubscription fix
- Pull 1247 Add zip(iterable, selector) to RxScala
- Pull 1260 Fix the bug that BlockingObservable.singleOrDefault doesn't call unsubscribe
- Pull 1269 Fix the bug that int overflow can bypass the range check
- Pull 1272 ExecutorScheduler to wrap an Executor
- Pull 1264 ObserveOn scheduled unsubscription
- Pull 1271 Operator Retry with predicate
- Pull 1265 Add more operators to RxScala
- Pull 1281 Reduce Subscription Object Allocation
- Pull 1284 Lock-free, MPSC-queue
- Pull 1288 Ensure StringObservable.from() does not perform unnecessary read
- Pull 1286 Rename some Operator* classes to OnSubscribe*
- Pull 1276 CachedThreadScheduler
- Pull 1287 ReplaySubject remove replayState CHM and related SubjectObserver changes
- Pull 1289 Schedulers.from(Executor)
- Pull 1290 Upgrade to JMH 0.7.3
- Pull 1293 Fix and Update JMH Perf Tests
- Pull 1291 Check unsubscribe within observable from future
- Pull 1294 rx.operators -> rx.internal.operators
- Pull 1295 Change
void accept
toboolean accept
- Pull 1296 Move re-used internal Scheduler classes to their own package
- Pull 1298 Remove Bad Perf Test
- Pull 1301 RxScala: Add convenience method for adding unsubscription callback
- Pull 1304 Add flatMap and concatMap to RxScala
- Pull 1306 Hooked RxJavaPlugins errorHandler up within all operators that swallow onErrors
- Pull 1309 Hide ChainedSubscription/SubscriptionList from Public API
Version 0.18.4 (Maven Central)
This is a fix for CompositeSubscription
object allocation problems. Details can be found in issue #1204.
- Pull 1283 Subscription object allocation fix
Version 0.18.3 (Maven Central)
- [Pull 1161] (ReactiveX#1161) Removed use of deprecated API from tests & operators
- [Pull 1162] (ReactiveX#1162) fix to remove drift from schedulePeriodic
- [Pull 1159] (ReactiveX#1159) Rxscala improvement
- [Pull 1164] (ReactiveX#1164) JMH Perf Tests for Schedulers.computation
- [Pull 1158] (ReactiveX#1158) Scheduler correctness improvements.
Version 0.18.2 (Maven Central)
- [Pull 1150] (ReactiveX#1150) Fix ReplaySubject Terminal State Race Condition
- [Pull 1144] (ReactiveX#1144) Operator Delay rebase & fixes
- [Pull 1142] (ReactiveX#1142) Update 'contains' signature to 'contains(Object)'
- [Pull 1134] (ReactiveX#1134) OperatorTakeLast
- [Pull 1135] (ReactiveX#1135) OperatorTakeUntil
- [Pull 1137] (ReactiveX#1137) Random fixes to operators multicast, sample, refCount
- [Pull 1138] (ReactiveX#1138) Operator Window and other changes
- [Pull 1131] (ReactiveX#1131) Operator TakeTimed
- [Pull 1130] (ReactiveX#1130) Operator Switch
- [Pull 1129] (ReactiveX#1129) Conditional statements contribution to Operator
- [Pull 1128] (ReactiveX#1128) Fix for SerializedObserverTest
- [Pull 1126] (ReactiveX#1126) Operator When
- [Pull 1125] (ReactiveX#1125) Operator contrib math
- [Pull 1124] (ReactiveX#1124) Add lift to rxscala
- [Pull 1122] (ReactiveX#1122) OperatorSkipUntil
- [Pull 1121] (ReactiveX#1121) OperatorSkipTimed
- [Pull 1120] (ReactiveX#1120) OperatorSequenceEqual
- [Pull 1119] (ReactiveX#1119) OperatorRefCount
- [Pull 1118] (ReactiveX#1118) Operator ParallelMerge
- [Pull 1117] (ReactiveX#1117) Operator OnExceptionResumeNextViaObservable
- [Pull 1115] (ReactiveX#1115) OperatorTakeWhile
- [Pull 1112] (ReactiveX#1112) OperatorThrottleFirst
- [Pull 1111] (ReactiveX#1111) OperatorTimeInterval
- [Pull 1110] (ReactiveX#1110) OperatorOnErrorReturn
- [Pull 1109] (ReactiveX#1109) OperatorOnErrorResumeNextViaObservable
- [Pull 1108] (ReactiveX#1108) OperatorMulticastAndReplay
- [Pull 1107] (ReactiveX#1107) Fix ReplaySubject's double termination problem.
- [Pull 1106] (ReactiveX#1106) OperatorMergeMaxConcurrent
- [Pull 1104] (ReactiveX#1104) Operator merge delay error
- [Pull 1103] (ReactiveX#1103) OperatorJoin
- [Pull 1101] (ReactiveX#1101) Operator async
- [Pull 1100] (ReactiveX#1100) OperatorUsing
- [Pull 1099] (ReactiveX#1099) OperatorToMap
- [Pull 1098] (ReactiveX#1098) OperatorTimerAndSample
- [Pull 1097] (ReactiveX#1097) OperatorToMultimap
- [Pull 1096] (ReactiveX#1096) OperatorGroupJoin
- [Pull 1095] (ReactiveX#1095) OperatorGroupByUntil
- [Pull 1094] (ReactiveX#1094) Operator debounce
Version 0.18.1 (Maven Central)
- [Pull 1065] (ReactiveX#1065) Optimize OperatorSkipLastTimed
- [Pull 1073] (ReactiveX#1073) OperatorBuffer
- [Pull 1074] (ReactiveX#1074) OperatorConcat
- [Pull 1088] (ReactiveX#1088) OperatorToObservableFuture
- [Pull 1087] (ReactiveX#1087) OperatorMergeMap
- [Pull 1086] (ReactiveX#1086) OperatorFinallyDo
- [Pull 1085] (ReactiveX#1085) OperatorDistinctUntilChanged
- [Pull 1084] (ReactiveX#1084) OperatorDistinct
- [Pull 1083] (ReactiveX#1083) OperatorDematerialize
- [Pull 1081] (ReactiveX#1081) OperatorDefer
- [Pull 1080] (ReactiveX#1080) OperatorDefaultIfEmpty
- [Pull 1079] (ReactiveX#1079) OperatorCombineLatest
- [Pull 1074] (ReactiveX#1074) OperatorConcat
- [Pull 1073] (ReactiveX#1073) OperatorBuffer
- [Pull 1091] (ReactiveX#1091) Handle Thrown Errors with UnsafeSubscribe
- [Pull 1092] (ReactiveX#1092) Restore ObservableExecutionHook.onCreate
Version 0.18.0 (Maven Central)
This release takes us a step closer to 1.0 by completing some of the remaining work on the roadmap.
The first is simplifying the Scheduler API.
The Scheduler API is now simplified to this:
class Scheduler {
public abstract Worker createWorker();
public int parallelism();
public long now();
public abstract static class Worker implements Subscription {
public abstract Subscription schedule(Action0 action, long delayTime, TimeUnit unit);
public abstract Subscription schedule(Action0 action);
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit);
public long now();
}
}
This is a breaking change if you have a custom Scheduler
implementation or use a Scheduler
directly. If you only ever pass in a Scheduler
via the Schedulers
factory methods, this change does not affect you.
Additionally, the ExecutionScheduler
was removed because a general threadpool does not meet the requirements of sequential execution for an Observable
. It was replaced with rx.schedulers.EventLoopScheduler
which is the new default for Schedulers.computation()
. It is a pool of event loops.
The rx.joins
package and associated when
, and
and then
operators were moved out of rxjava-core into a new module rxjava-joins. This is done as the rx.joins API was not yet matured and is not going to happen before 1.0. It was determined low priority and not worth blocking a 1.0 release. If the API matures inside the separate module to the point where it makes sense to bring it back into the core it can be done in the 1.x series.
This releases removes many of the classes and methods that have been deprecated in previous releases. Most of the removed functionality was migrated in previous releases to contrib modules such as rxjava-math, rxjava-async and rxjava-computation-expressions.
A handful of deprecated items still remain but can not yet be removed until all internal operators are finished migrating to using the lift
/Subscriber
design changes done in 0.17.0.
The full list of changes in 0.18.0:
- [Pull 1047] (ReactiveX#1047) Scheduler Simplification
- [Pull 1072] (ReactiveX#1072) Scheduler.Inner -> Scheduler.Worker
- [Pull 1053] (ReactiveX#1053) Deprecation Cleanup
- [Pull 1052] (ReactiveX#1052) Scheduler Cleanup
- [Pull 1048] (ReactiveX#1048) Remove ExecutorScheduler - New ComputationScheduler
- [Pull 1049] (ReactiveX#1049) Move rx.joins to rxjava-joins module
- [Pull 1068] (ReactiveX#1068) add synchronous test of resubscribe after error
- [Pull 1066] (ReactiveX#1066) CompositeSubscription fix
- [Pull 1071] (ReactiveX#1071) Manual Merge of AsObservable
- [Pull 1063] (ReactiveX#1063) Fix bugs in equals and hashCode of Timestamped
- [Pull 1070] (ReactiveX#1070) OperationAny -> OperatorAny
- [Pull 1069] (ReactiveX#1069) OperationAll -> OperatorAll
- [Pull 1058] (ReactiveX#1058) Typo in javadoc
- [Pull 1056] (ReactiveX#1056) Add drop(skip) and dropRight(skipLast) to rxscala
- [Pull 1057] (ReactiveX#1057) Fix: Retry in Scala adaptor is ambiguous
- [Pull 1055] (ReactiveX#1055) Fix: Missing Quasar instrumentation on Observable$2.call
- [Pull 1050] (ReactiveX#1050) Reimplement the 'SkipLast' operator
- [Pull 967] (ReactiveX#967) Reimplement the 'single' operator
Version 0.17.6 (Maven Central)
- [Pull 1031] (ReactiveX#1031) Fix NPE in SubjectSubscriptionManager
- [Pull 1030] (ReactiveX#1030) Benchmarking: Add JMH benchmark for ReplaySubject
- [Pull 1033] (ReactiveX#1033) isolate subscriber used for retries, cleanup tests
- [Pull 1021] (ReactiveX#1021) OperatorWeakBinding to not use WeakReferences anymore
- [Pull 1005] (ReactiveX#1005) add toMap from Java Observable
- [Pull 1040] (ReactiveX#1040) Fixed deadlock in Subjects + OperatorCache
- [Pull 1042] (ReactiveX#1042) Kotlin M7 and full compatibility with 0.17.0
- [Pull 1035] (ReactiveX#1035) Scala cleanup
- [Pull 1009] (ReactiveX#1009) Android - Adding a new RetainedFragment example
- [Pull 1020] (ReactiveX#1020) Upgrade Gradle wrapper for Android samples to Gradle 1.11
- [Pull 1038] (ReactiveX#1038) rxjava-android: parameterize OperatorViewClick by concrete view type
Version 0.17.5 (Maven Central)
- [Pull 1010] (ReactiveX#1010) Observable.unsafeSubscribe
- [Pull 1015] (ReactiveX#1015) Remove Redundant protectivelyWrap Method
- [Pull 1019] (ReactiveX#1019) Fix: retry() never unsubscribes from source until operator completes
Version 0.17.4 (Maven Central)
- [Pull 990] (ReactiveX#990) Quasar Lightweight Threads/Fibers Contrib Module
- [Pull 1012] (ReactiveX#1012) SerializedObserver: Removed window between the two synchronized blocks
Version 0.17.3 (Maven Central)
- [Pull 991] (ReactiveX#991) JMH Benchmark Build Config
- [Pull 993] (ReactiveX#993) JMH Perf Tests
- [Pull 995] (ReactiveX#995) Support Custom JMH Args
- [Pull 996] (ReactiveX#996) JMH Perfshadowjar
- [Pull 1003] (ReactiveX#1003) Func0 can transparently implement java.util.concurrent.Callable
- [Pull 999] (ReactiveX#999) New Implementation of SerializedObserver
Version 0.17.2 (Maven Central)
- Pull 963 A more robust JMH benchmarking set-up
- Pull 964 SubjectSubscriptionManager fix.
- Pull 970 Notifications for the allocation averse.
- Pull 973 Merge - Handle Bad Observables
- Pull 974 TestSubject, TestObserver and TestScheduler Improvements
- Pull 975 GroupBy & Time Gap Fixes
- Pull 976 parallel-merge unit test assertions
- Pull 977 Dematerialize - handle non-materialized terminal events
- Pull 982 Pivot Operator
- Pull 984 Tests and Javadoc for Pivot
- Pull 966 Reimplement the ElementAt operator and add it to rxjava-scala
- Pull 965 BugFix: Chain Subscription in TimeoutSubscriber and SerializedSubscriber
- Pull 986 Fix SynchronizedObserver.runConcurrencyTest
- Pull 987 Fix Non-Deterministic Pivot Test
- Pull 988 OnErrorFailedException
Version 0.17.1 (Maven Central)
- Pull 953 Make ObserveOnTest.testNonBlockingOuterWhileBlockingOnNext deterministic
- Pull 930 Initial commit of the Android samples module
- Pull 938 OperatorWeakBinding (deprecates OperatorObserveFromAndroidComponent)
- Pull 952 rxjava-scala improvements and reimplemented the
amb
operator - Pull 955 Fixed ReplaySubject leak
- Pull 956 Fixed byLine test to use line.separator system property instead of \n.
- Pull 958 OperatorSkipWhile
- Pull 959 OperationToFuture must throw CancellationException on get() if cancelled
- Pull 928 Fix deadlock in SubscribeOnBounded
- Pull 960 Unit test for "Cannot subscribe to a Retry observable once all subscribers unsubscribed"
- Pull 962 Migrate from SynchronizedObserver to SerializedObserver
Version 0.17.0 (Maven Central)
Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.
The new signatures related to Observable
in this release are:
// A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
public final static <T> Observable<T> create(OnSubscribe<T> f)
// The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
// Subscriber is an Observer + Subscription
public abstract class Subscriber<T> implements Observer<T>, Subscription
// The main `subscribe` behavior receives a Subscriber instead of Observer
public final Subscription subscribe(Subscriber<? super T> subscriber)
// Subscribing with an Observer however is still appropriate
// and the Observer is automatically converted into a Subscriber
public final Subscription subscribe(Observer<? super T> observer)
// A new 'lift' function allows composing Operator implementations together
public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
// The `Operator` used with `lift`
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
Also changed is the Scheduler
interface which is much simpler:
public abstract class Scheduler {
public Subscription schedule(Action1<Scheduler.Inner> action);
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
public final Subscription scheduleRecursive(final Action1<Recurse> action)
public long now();
public int degreeOfParallelism();
public static class Inner implements Subscription {
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Scheduler.Inner> action);
public long now();
}
public static final class Recurse {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
}
}
This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.
As shown in the code above the changes fall into 2 major sections:
Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.
Simplification of the Scheduler
interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.
New types Subscriber
and OnSubscribe
along with the new lift
function have been added. The reasons and benefits are as follows:
RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Observer<? super Integer> Observer) {
for (int i = 1; i < 1000000; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Subscribing to this Observable
will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10)
.
Version 0.17.0 fixes this issue by injecting the Subscription
into the OnSubscribe
function to allow code like this:
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// we now receive a Subscriber instead of Observer
for (int i = 1; i < 1000000; i++) {
// the OnSubscribe can now check for isUnsubscribed
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Subscribing to this will now correctly only emit 10 onNext
and unsubscribe:
// subscribe with an Observer
oi.take(10).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
println("Received: " + t);
}
})
Or the new Subscriber
type can be used and the Subscriber
itself can unsubscribe
:
// or subscribe with a Subscriber which supports unsubscribe
oi.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
println("Received: " + t);
if(t >= 10) {
// a Subscriber can unsubscribe
this.unsubscribe();
}
}
})
Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable
is using static methods. This has meant code like this:
MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
In reality we want:
observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
Using the newly added lift
we can get quite close to this:
observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
Here is how the proposed lift
method looks if all operators were applied with it:
Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
Along with the lift
function comes a new Operator
signature:
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
All operator implementations in the rx.operators
package will over time be migrated to this new signature.
NOTE: Operators that have not yet been migrated do not work with synchronous unsubscribe.
The lift
operator injects the necessary Observer
and Subscription
instances (via the new Subscriber
type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription
is available in-scope there are no awkward coding patterns needed for creating a Subscription
, closing over it and returning and taking into account synchronous vs asynchronous.
For example, the body of fromIterable
is simply:
public void call(Subscriber<? super T> o) {
for (T i : is) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(i);
}
o.onCompleted();
}
The take
operator is:
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}
child.add(parent);
return new Subscriber<T>(parent) {
int count = 0;
boolean completed = false;
@Override
public void onCompleted() {
if (!completed) {
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!completed) {
child.onError(e);
}
}
@Override
public void onNext(T i) {
if (!isUnsubscribed()) {
child.onNext(i);
if (++count >= limit) {
completed = true;
child.onCompleted();
unsubscribe();
}
}
}
};
}
The fromIterable
use case is 20x faster when implemented as a loop instead of recursive scheduler (see https://github.com/ReactiveX/RxJava/commit/a18b8c1a572b7b9509b7a7fe1a5075ce93657771).
Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.
Schedulers were greatly simplified to a design based around Action1<Inner>
.
public abstract class Scheduler {
public Subscription schedule(Action1<Scheduler.Inner> action);
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
public final Subscription scheduleRecursive(final Action1<Recurse> action)
public long now();
public int degreeOfParallelism();
public static class Inner implements Subscription {
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Scheduler.Inner> action);
public long now();
}
public static final class Recurse {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
}
}
This design change originated from three findings:
- It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.
To solve this the new design explicitly has the outer Scheduler
and then Scheduler.Inner
for recursion.
- The passing of state is not useful since scheduling over network boundaries with this model does not work.
In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler
that attempted to use observeOn
to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.
- The number of overloads with different ways of doing the same things were confusing.
This new design removes all but the essential and simplest methods.
- A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.
This new design applies similar principles as done with lift
/create
/OnSubscribe
/Subscriber
and injects the Subscription
via the Inner
interface so a running task can check isUnsubscribed()
.
WIth this new design, the simplest execution of a single task is:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
}
});
Recursion is easily invoked like this:
Schedulers.newThread().scheduleRecursive(new Action1<Recurse>() {
@Override
public void call(Recurse recurse) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
recurse.schedule();
}
});
or like this if the outer and inner actions need different behavior:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
inner.schedule(this);
}
});
The use of Action1<Inner>
on both the outer and inner levels makes it so recursion that refer to this
and it works easily.
Similar to the new lift
/create
pattern with Subscriber
the Inner
is also a Subscription
so it allows efficient loops with unsubscribe
support:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
doWork();
}
}
});
An action can now unsubscribe
the Scheduler.Inner
:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
int i = doOtherWork();
if(i > 100) {
// an Action can cause the Scheduler to unsubscribe and stop
inner.unsubscribe();
}
}
}
});
Typically just stopping is sufficient:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = doOtherWork();
if (i < 10) {
// recurse until done 10
inner.schedule(this);
}
}
});
but if other work in other tasks is being done and you want to unsubscribe conditionally you could:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = doOtherWork();
if (i < 10) {
// recurse until done 10
inner.schedule(this);
} else {
inner.unsubscribe();
}
}
});
and the recursion can be delayed:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed ... but delay the recursion
inner.schedule(this, 500, TimeUnit.MILLISECONDS);
}
});
The same pattern works with the Recurse
signature:
Schedulers.newThread().scheduleRecursive(new Action1<Recurse>() {
@Override
public void call(Recurse recurse) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
recurse.schedule(500, TimeUnit.MILLISECONDS);
}
});
The methods on the Inner
never return a Subscription
because they are always a single thread/event-loop/actor/etc and controlled by the Subscription
returned by the initial Scheduler.schedule
method. This is part of clarifying the contract.
Thus an unsubscribe
controlled from the outside would be done like this:
Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
doWork();
}
}
});
// unsubscribe from outside
s.unsubscribe();
The lift
function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber
type is also additive and for most use cases does not need to be used directly, the Observer
interface can continue being used.
The previous create(OnSubscribeFunc f)
signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.
Code such as this:
Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> o) {
o.onNext(1);
o.onCompleted();
return Subscriptions.empty();
}
});
should change to this:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});
If concurrency was being injected to allow unsubscribe support:
Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(final Observer<? super Integer> o) {
final BooleanSubscription s = new BooleanSubscription();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (!s.isUnsubscribed()) {
o.onNext(i++);
}
}
});
t.start();
return s;
}
});
you may no longer need it and can implement like this instead:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
or if the concurreny is still desired you can simplify the Subscription
management:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
t.start();
}
});
or use subscribeOn
which now works to make synchronous Observables
async while supporting unsubscribe
(this didn't work before):
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
}).subscribeOn(Schedulers.newThread());
Custom Scheduler
implementations will need to be re-implemented and any direct use of the Scheduler
interface will also need to be updated.
If you have custom Subscription
implementations you will see they now need an isUnsubscribed()
method.
You can either add this method, or wrap your function using Subscriptions.create
and it will handle the isUnsubscribed
behavior and execute your function when unsubscribe()
is called.
It is recommended to use Subscriptions.create
for most Subscription
usage.
We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create
/subscribe
signatures as we want and the Subscription
and Scheduler
interfaces are now clean. There is at least one more major topic related to back pressure that may result in signature change in a future release. Beyond that no further major signature changing work is expected prior to 1.0.
We still need to improve on some of the Subject
implementations still, particularly ReplaySubject
. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.
As we get closer to 1.0 there will be a release that focused on deleting all deprecated methods so it is suggested to start migrating off of them.
We appreciate your usage, feedback and contributions and hope the library is creating value for you!
- Pull 767 Zip fix for multiple onCompleted and moved unsubscribe outside the lock.
- Pull 770 Bind Operator
- Pull 778 Fix zip race condition
- Pull 784 Lift and Observer+Subscription
- Pull 793 Observer + Subscriber
- Pull 796 Add Subscription.isUnsubscribed()
- Pull 797 Scheduler Outer/Inner [Preview]
- Pull 805 Fix CompositeException
- Pull 785 Reimplement Zip Operator Using Lift [Preview]
- Pull 814 RunAsync method for outputting multiple values
- Pull 812 Fixed OperationSubscribeOn so OperationConditionalsTest works again.
- Pull 816 One global onCompleted object
- Pull 818 CompositeSubscription memory reduction
- Pull 817 Scala Scheduler Bindings Fix
- Pull 819 CompositeSubscription performance increase
- Pull 781 Fixed buglet in join binding, simplified types
- Pull 783 Implement some Android UI related operators
- Pull 821 Update to use Subscriber/Subscriptions.create
- Pull 826 Return wrapped Subscription
- Pull 824 Set setDaemon on NewThreadScheduler
- Pull 828 Repeat Operator
- Pull 827 Fixed cut & paster error in io scheduler
- Pull 833 Take operator was breaking the unsubscribe chain
- Pull 822 Reimplement 'subscribeOn' using 'lift'
- Pull 832 Issue #831 Fix for OperationJoin race condition
- Pull 834 Update clojure for 0.17
- Pull 839 Error Handling: OnErrorNotImplemented and java.lang.Error
- Pull 838 Make Scala OnCompleted Notification an object
- Pull 837 Perf with JMH
- Pull 841 Range OnSubscribe
- Pull 842 Test Unsubscribe
- Pull 845 Fix problem with Subscription
- Pull 847 Various Changes While Fixing GroupBy
- Pull 849 Add 'Fragment-Host' to rxjava-contrib modules for OSGi
- Pull 851 Reimplement the timeout operator and fix timeout bugs
- Pull 846 Added overloaded createRequest method that takes an HttpContext instance
- Pull 777 Fixed testSingleSourceManyIterators
- Pull 852 rxjava-debug
- Pull 853 StringObservable Update
- Pull 763 Added support for custom functions in combineLatest.
- Pull 854 The onCreate hook disappeared
- Pull 857 Change Lift to use rx.Observable.Operator
- Pull 859 Add 'Fragment-Host' to rxjava-contrib/debug module for OSGi
- Pull 860 Fixing the generics for merge and lift
- Pull 863 Optimize SwingMouseEventSource.fromRelativeMouseMotion
- Pull 862 Update the timeout docs
- Pull 790 Convert to scan to use lift
- Pull 866 Update OperationScan to OperatorScan
- Pull 870 Add the selector variants of timeout in RxScala
- Pull 874 Update CompositeSubscriptionTest.java
- Pull 869 subscribeOn + groupBy
- Pull 751 Provide Observable.timestamp(Scheduler) to be used in the tests.
- Pull 878 Scheduler.scheduleRecursive
- Pull 877 Correct synchronization guard in groupByUntil
- Pull 880 Force ViewObservable be subscribed and unsubscribed in the UI thread
- Pull 887 Remove Bad Filter Logic
- Pull 890 Split SubscribeOn into SubscribeOn/UnsubscribeOn
- Pull 891 Eliminate rx.util.* dumping grounds
- Pull 881 Lift Performance
- Pull 893 Change Parallel to use Long instead of Int
- Pull 894 Synchronized Operator Check for isTerminated
- Pull 885 Fixed an issue with the from(Reader) added a bunch of unit tests.
- Pull 896 removing java 7 dep
- Pull 883 Make Subscriptions of SwingObservable thread-safe
- Pull 895 Rewrite OperationObserveFromAndroidComponent to OperatorObserveFromAndroid
- Pull 892 onErrorFlatMap + OnErrorThrowable
- Pull 898 Handle illegal errors thrown from plugin
- Pull 901 GroupBy Unit Test from #900
- Pull 902 Fixed NullPointerException that may happen on timeout
- Pull 903 Scheduler.Recurse fields should be private
- Pull 904 Merge: Unsubscribe Completed Inner Observables
- Pull 905 RxJavaSchedulers Plugin
- Pull 909 Scheduler Plugin Refactor
- Pull 910 Remove groupBy with selector
- Pull 918 Operator: doOnTerminate
- Pull 919 BugFix: Zip Never Completes When Zero Observables
- Pull 920 Delete Deprecated onSubscribeStart That Doesn't Work
- Pull 922 Changes made while integrating it with our internal system
- Pull 924 Localized Operator Error Handling
- Pull 925 Rxjava clojure bindings final
- Pull 926 TestSubscriber: Default onError and Terminal Latch Behavior
- Pull 927 TestSubscriber lastSeenThread
- Pull 936 Skip fixed
- Pull 942 MathObservable
- Pull 944 OperationRetry -> OperatorRetry
- Pull 945 refactor the debug hooks before they become a breaking change.
- Pull 934 add Observable.startWith(Observable) method and unit test
- Pull 929 correct link to maven search
- Pull 923 Observable creation from Subscriber[T]=>Unit for Scala
- Pull 931 A number of improvements to OperatorObserveFromAndroidComponent
- Pull 950 Add support for Eclipse PDE
Version 0.16.1 (Maven Central)
- Pull 730 Improve Error Handling and Stacktraces When Unsubscribe Fails
- Pull 720 Added
Observable.timeout
wrappers to scala adapter - Pull 731 Fix non-deterministic unit test
- Pull 742 Build with Gradle 1.10
- Pull 718 Merge overloads
- Pull 733 Buffer with Observable boundary
- Pull 734 Delay with subscription and item delaying observables
- Pull 735 Window with Observable boundary
- Pull 736 MergeMap with Iterable and resultSelector overloads
- Pull 738 Publish and PublishLast overloads
- Pull 739 Debounce with selector
- Pull 740 Timeout with selector overloads
- Pull 745 Fixed
switch
bug - Pull 741 Zip with iterable, removed old aggregator version and updated tests
- Pull 749 Separated Android test code from source
- Pull 732 Ported groupByUntil function to scala-adapter
Version 0.16.0 (Maven Central)
This is a significant release with the following changes:
- Refactor of Subjects and Subscriptions to non-blocking implementations
- Many bug fixes, new operators and behavior changes to match Rx.Net.
- Deprecation of some operators due to renaming or eliminating duplicates
- The
rx.concurrency
package has been renamed torx.schedulers
. Existing classes still remain inrx.concurrency
but are deprecated. Use ofrx.concurrency
should be migrated torx.schedulers
as these deprecated classes will be removed in a future release. - Breaking changes to Scala bindings. See Release Notes for details.
- New modules: rxjava-string, rxjava-async-util and rxjava-computation-expressions for operators deemed not applicable to the core library.
- Pull 516 rxjava-string module with StringObservable
- Pull 533 Operator: ToAsync
- Pull 535 Fix compilation errors due to referencing the Android support library directly
- Pull 545 Fixed Zip issue with infinite streams
- Pull 539 Zipping a finite and an infinite Observable
- Pull 541 Operator: SkipUntil
- Pull 537 Add scala adapters for doOnEach operator
- Pull 560 Add type variances for doOnEach actions
- Pull 562 Scala Adaptor Improvements
- Pull 563 Operator: GroupByUntil
- Pull 561 Revised Approach to Creating Observables in Scala
- Pull 565 Operator: GroupJoin v2
- Pull 567 Operator: Timestamp with Scheduler
- Pull 568 Use lock free strategy for several Subscription implementations
- Pull 571 Operator: Sample with Observable v2
- Pull 572 Multiple Subscriptions to ObserveOn
- Pull 573 Removed Opening and Closing historical artifacts
- Pull 575 Operator: SequenceEqual reimplementation
- Pull 587 Operator: LongCount
- Pull 586 Fix Concat to allow multiple observers
- Pull 598 New Scala Bindings
- Pull 596 Fix for buffer not stopping when unsubscribed
- Pull 576 Operators: Timer and Delay
- Pull 593 Lock-free subscriptions
- Pull 599 Refactor rx.concurrency to rx.schedulers
- Pull 600 BugFix: Replay Subject
- Pull 594 Operator: Start
- Pull 604 StringObservable.join
- Pull 609 Operation: Timer
- Pull 612 Operation: Replay (overloads)
- Pull 628 BugFix: MergeDelayError Synchronization
- Pull 602 BugFix: ObserveOn Subscription leak
- Pull 631 Make NewThreadScheduler create Daemon threads
- Pull 651 Subjects Refactor - Non-Blocking, Common Abstraction, Performance
- Pull 661 Subscriptions Rewrite
- Pull 520 BugFix: blocking/non-blocking
first
- Pull 621 Scala: SerialSubscription & From
- Pull 626 BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable
- Pull 633 BugFix: null in toList operator
- Pull 635 Conditional Operators
- Pull 638 Operations: DelaySubscription, TakeLast w/ time, TakeLastBuffer
- Pull 659 Missing fixes from the subject rewrite
- Pull 688 Fix SafeObserver handling of onComplete errors
- Pull 690 Fixed Scala bindings
- Pull 693 Kotlin M6.2
- Pull 689 Removed ObserverBase
- Pull 664 Operation: AsObservable
- Pull 697 Operations: Skip, SkipLast, Take with time
- Pull 698 Operations: Average, Sum
- Pull 699 Operation: Repeat
- Pull 701 Operation: Collect
- Pull 707 Module: rxjava-async-util
- Pull 708 BugFix: combineLatest
- Pull 712 Fix Scheduler Memory Leaks
- Pull 714 Module: rxjava-computation-expressions
- Pull 715 Add missing type hint to clojure example
- Pull 717 Scala: Added ConnectableObservable
- Pull 723 Deprecate multiple arity ‘from’
- Pull 724 Revert use of CurrentThreadScheduler for Observable.from
- Pull 725 Simpler computation/io naming for Schedulers
- Pull 727 ImmediateScheduler optimization for toObservableIterable
Version 0.15.1 (Maven Central)
This release should be additive functionality and bug fixes.
- Pull 510 Operators: And, Then, When
- Pull 514 Operator: Join
- Pull 525 Operators: toMap/toMultiMap
- Pull 510 BugFix: Zip
- Pull 512 Scala Adaptor Details
- Pull 512 Scala fixes
- Pull 508 Empty subscribe
- Pull 522 Unsubscribe from takeLast
- Pull 525 BugFix: Handling of Terminal State for Behavior/Publish Subjects
Version 0.15.0 (Maven Central)
This release contains a refactor of the Scala Bindings by @headinthebox that results in some breaking changes. The previous solution ended up not working well in all cases for idiomatic Scala usage. Java/Scala interop has been changed and is no longer transparent so as to optimize for native Scala usage. Read the rxjava-scala README for more information.
- Pull 503 New Scala Bindings
- Pull 502 Fix ObserveOn and add ParallelMerge Scheduler overload
- Pull 499 ObserveOn Refactor
- Pull 492 Implement the scheduler overloads for Range, From, StartWith
- Pull 496 Add contravariant for min and max
Version 0.14.11 (Maven Central)
This has a very slight breaking change by removing one doOnEach
overload. The version was not bumped from 0.14 to 0.15 as it is so minor and the offending method was just released in the previous version.
Version 0.14.10 (Maven Central)
- Pull 481 Operator: Using
- Pull 480 BugFix: Emit an IllegalArgumentException instead of ArithmeticException if the observable is empty
- Pull 479 Operator: DoOnEach
- Pull 478 Operator: Min, MinBy, Max, MaxBy
- Pull 463 Add Timeout Overloads
Version 0.14.9 (Maven Central)
- Pull 477 BugFix: CompositeSubscription
- Pull 476 BugFix: Don't emit null onComplete when no onNext received in AsyncSubject
- Pull 474 BugFix: Reduce an empty observable
- Pull 474 BugFix: non-deterministic unit test
- Pull 472 BugFix: Issue 431 Unsubscribe with Schedulers.newThread
- Pull 470 Operator: Last
Version 0.14.8 (Maven Central)
Version 0.14.7 (Maven Central)
- Pull 459 Fix multiple unsubscribe behavior
- Pull 458 rxjava-android: OperationObserveFromAndroidComponent
- Pull 453 Fix error handling in map operator
- Pull 450 Operator: TimeInterval
- Pull 452 Scheduler Overload of Just/Return Operator
- Pull 433 Fixes: Next Operator
- Commit d64a8c5 Update rxjava-apache-http to Apache HttpAsyncClient 4.0 GA
Version 0.14.6 (Maven Central)
- Pull 441 Fixed the issue that 'take' does not call 'onError'
- Pull 443 OperationSwitch notify onComplete() too early.
- Pull 434 Timeout operator and SerialSubscription
- Pull 447 Caching the result of 'isInternalImplementation'
Version 0.14.5 (Maven Central)
- Pull 438 Kotlin Language Adaptor
Version 0.14.4 (Maven Central)
- Issue 428 Fix: buffer() using TimeAndSizeBasedChunks incorrectly forces thread into interrupted state
- Pull 435 rx-apache-http recognizes "Transfer-Encoding: chunked" as an HTTP stream
- Pull 437 Fixes: Scheduler and Merge
Version 0.14.3 (Maven Central)
- Pull 407 Operator: RefCount
- Pull 410 Operator: Contains
- Pull 411 Unit Test fix: update counter before triggering latch
- Pull 413 Fixed the issues of takeLast(items, 0) and null values
- Pull 414 Operator: SkipLast
- Pull 415 Operator: Empty with scheduler
- Pull 416 Operator: Throw with scheduler
- Pull 420 Scala Adaptor Improvements
- Pull 422 JRuby function wrapping support
- Pull 424 Operator: IgnoreElements
- Pull 426 PublishSubject ReSubscribe for publish().refCount() Behavior
Version 0.14.2 (Maven Central)
- Pull 403 Operators: Cast and OfType
- Pull 401 Operator: DefaultIfEmpty
- Pull 409 Operator: Synchronize with object
Version 0.14.1 (Maven Central)
- Pull 402 rxjava-apache-http improvements
Version 0.14.0 (Maven Central)
Further progress to the Scala adaptor and a handful of new operators.
Bump to 0.14.0 due to small breaking change to distinct
operator removing overloaded methods with Comparator
. These methods were added in 0.13.2 and determined to be incorrect.
This release also includes a new contrib module, rxjava-apache-http that provides an Observable API to the Apache HttpAsyncClient.
- Pull 396 Add missing methods to Scala Adaptor
- Pull 390 Operators: ElementAt and ElementAtOrDefault
- Pull 398 Operators: IsEmpty and Exists (instead of Any)
- Pull 397 Observable API for Apache HttpAsyncClient 4.0
- Pull 400 Removing
comparator
overloads ofdistinct
- Upload to Sonatype failed so version skipped
Version 0.13.4 (Maven Central)
- Pull 393 Parallel Operator & ObserveOn/ScheduledObserver Fixes
- Pull 394 Change Interval and Sample default Scheduler
- Pull 391 Fix OSGI support for rxjava-scala
- Upload to Sonatype failed so version skipped
Version 0.13.2 (Maven Central)
- Pull 389 Scala Adaptor Improvements
- Pull 382 Removing deprecated RxImplicits from rxjava-scala
- Pull 381 Operator: mapWithIndex
- Pull 380 Implemented
distinct
anddistinctUntilChanged
variants using a comparator - Pull 379 Make
interval
work with multiple subscribers
Version 0.13.1 (Maven Central)
This release includes a new Scala adaptor as part of the effort from issue ReactiveX#336 pursuing idiomatic Scala support.
- Pull 376 Idiomatic Scala Adaptor
- Pull 375 Operator: Distinct
- Pull 374 Operator: DistinctUntilChanged
- Pull 373 Fixes and Cleanup
Version 0.13.0 (Maven Central)
This release has some minor changes related to varargs that could break backwards compatibility if directly passing arrays but for most this release should not be breaking.
- Pull 354 Operators: Count, Sum, Average
- Pull 355 Operators: skipWhile and skipWhileWithIndex
- Pull 356 Operator: Interval
- Pull 357 Operators: first and firstOrDefault
- Pull 368 Operators: Throttle and Debounce
- Pull 371 Operator: Retry
- Pull 370 Change zip method signature from Collection to Iterable
- Pull 369 Generics Improvements: co/contra-variance
- Pull 361 Remove use of varargs from API
Version 0.12.2 (Maven Central)
- Pull 352 Groovy Language Adaptor: Add Func5-9 and N to the wrapper
Version 0.12.1 (Maven Central)
Version 0.12.0 (Maven Central)
This version adds to the static typing changes in 0.11 and adds covariant/contravariant typing via super/extends generics.
Additional cleanup was done, particularly related to BlockingObservable
. Also the window
operator was added.
The largest breaking change is that Observable.create
now accepts an OnSubscribeFunc
rather than a Func1
.
This means that instead of this:
public static <T> Observable<T> create(Func1<? super Observer<? super T>, ? extends Subscription> func)
it is now:
public static <T> Observable<T> create(OnSubscribeFunc<T> func)
This was done to simplify the usage of Observable.create
which was already verbose but made far worse by the ? super
generics.
For example, instead of writing this:
Observable.create(new Func1<Observer<? super SomeType>, Subscription>() {
/// body here
}
it is now written as:
Observable.create(new OnSubscribeFunc<SomeType>() {
/// body here
}
- Pull 343 Covariant Support with super/extends and
OnSubscribeFunc
as type forObservable.create
- Pull 337 Operator:
window
- Pull 348 Rename
switchDo
toswitchOnNext
(deprecateswitchDo
for eventual deletion) - Pull 348 Delete
switchDo
instance method in preference for static - Pull 346 Remove duplicate static methods from
BlockingObservable
- Pull 346
BlockingObservable
no longer extends fromObservable
- Pull 345 Remove unnecessary constructor from
Observable
Version 0.11.2 (Maven Central)
- Commit ccf53e8 Update to Scala 2.10.2
Version 0.11.1 (Maven Central)
- Pull 325 Clojure: Preserve metadata on fn and action macros
Version 0.11.0 (Maven Central)
This is a major refactor of rxjava-core and the language adaptors.
Note that there are breaking changes in this release. Details are below.
After this refactor it is expected that the API will settle down and allow us to stabilize towards a 1.0 release.
- Pull 332 Refactor Core to be Statically Typed
RxJava was written from the beginning to target the JVM, not any specific language.
As a side-effect of Java not having lambdas/clojures yet (and other considerations), Netflix used dynamic languages with it predominantly for the year of its existence prior to open sourcing.
To bridge the rxjava-core written in Java with the various languages a FunctionalLanguageAdaptor was registered at runtime for each language of interest.
To enable these language adaptors methods are overloaded with Object
in the API since Object
is the only super-type that works across all languages for their various implementations of lambdas and closures.
This downside of this has been that it breaks static typing for Java, Scala and other statically-typed languages. More can be read on this issue and discussion of the subject here: https://groups.google.com/forum/#!topic/rxjava/bVZoKSsb1-o
This release:
- removes all
Object
overload methods from rxjava-core so it is statically typed - removes dynamic FunctionalLanguageAdaptors
- uses idiomatic approaches for each language adaptor
- Java core is statically typed and has no knowledge of other languages
- Scala uses implicits
- Groovy uses an ExtensionModule
- Clojure adds a new macro (NOTE: this requires code changes)
- JRuby has been temporarily disabled (discussing new implementation at ReactiveX#320)
- language supports continue to be additive
- the rxjava-core will always be required and then whichever language modules are desired such as rxjava-scala, rxjava-clojure, rxjava-groovy are added to the classpath
- deletes deprecated methods
- deletes redundant static methods on
Observable
that cluttered the API and in some cases caused dynamic languages trouble choosing which method to invoke - deletes redundant methods on
Scheduler
that gave dynamic languages a hard time choosing which method to invoke
The benefits of this are:
- Everything is statically typed so compile-time checks for Java, Scala, etc work correctly
- Method dispatch is now done via native Java bytecode using types rather than going via
Object
which then has to do a lookup in a map. Memoization helped with the performance but each method invocation still required looking in a map for the correct adaptor. With this approach the appropriate methods will be compiled into therx.Observable
class to correctly invoke the right adaptor without lookups. - Interaction from each language should work as expected idiomatically for that language.
Further history on the various discussions and different attempts at solutions can be seen at ReactiveX#304, ReactiveX#204 and ReactiveX#208
Version 0.10.1 (Maven Central)
A new contrib module for Android: https://github.com/ReactiveX/RxJava/tree/master/rxjava-contrib/rxjava-android
- Pull 318 rxjava-android module with Android Schedulers
Version 0.10.0 (Maven Central)
This release includes a breaking change as it changes onError(Exception)
to onError(Throwable)
. This decision was made via discussion at ReactiveX#296.
Any statically-typed Observer
implementations with onError(Exception)
will need to be updated to onError(Throwable)
when moving to this version.
- Pull 312 Fix for OperatorOnErrorResumeNextViaObservable and async Resume
- Pull 314 Map Error Handling
- Pull 315 Change onError(Exception) to onError(Throwable) - Issue #296
Version 0.9.2 (Maven Central)
Version 0.9.1 (Maven Central)
- Pull 303 CombineLatest
- Pull 290 Zip overload with FuncN
- Pull 302 NPE fix when no package on class
- Pull 284 GroupBy fixes (items still oustanding)
- Pull 288 PublishSubject concurrent modification fixes
- Issue 198 Throw if no onError handler specified
- Issue 278 Subscribe argument validation
- Javadoc improvements and many new marble diagrams
Version 0.9.0 (Maven Central)
This release includes breaking changes that move all blocking operators (such as single
, last
, forEach
) to BlockingObservable
.
This means Observable
has only non-blocking operators on it. The blocking operators can now be accessed via .toBlockingObservable()
or BlockingObservable.from(observable)
.
Notes and link to the discussion of this change can be found at ReactiveX#272.
- Pull 272 Move blocking operators into BlockingObservable
- Pull 273 Fix Concat (make non-blocking)
- Issue 13 Operator: Switch
- Pull 274 Remove SLF4J dependency (RxJava is now a single jar with no dependencies)
Version 0.8.4 (Maven Central)
- Pull 269 (Really) Fix concurrency bug in ScheduledObserver
Version 0.8.3 (Maven Central)
- Pull 268 Fix concurrency bug in ScheduledObserver
Version 0.8.2 (Maven Central)
- Issue 74 Operator: Sample
- Issue 93 Operator: Timestamp
- Pull 253 Fix multiple subscription bug on operation filter
- Pull 254 SwingScheduler (new rxjava-swing module)
- Pull 256 BehaviorSubject
- Pull 257 Improved scan, reduce, aggregate
- Pull 262 SwingObservable (new rxjava-swing module)
- Pull 264 Publish, Replay and Cache Operators
Version 0.8.1 (Maven Central)
- Pull 250 AsyncSubject
- Pull 252 ToFuture
- Pull 246 Scheduler.schedulePeriodically
- Pull 247 flatMap aliased to mapMany
Version 0.8.0 (Maven Central)
This is a breaking (non-backwards compatible) release that updates the Scheduler implementation released in 0.7.0.
See ReactiveX#19 for background, discussion and status of Schedulers.
It is believed that the public signatures of Scheduler and related objects is now stabilized but ongoing feedback and review by the community could still result in changes.
- Issue 19 Schedulers improvements, changes and additions
- Issue 202 Fix Concat bugs
- Issue 65 Multicast
- Pull 218 ReplaySubject
Version 0.7.0 (Maven Central)
This release adds the foundations of Rx Schedulers.
There are still open questions, portions not implemented and assuredly bugs and behavior we didn't understand and thus implemented wrong.
Please provide bug reports, pull requests or feedback to help us on the road to version 1.0 and get schedulers implemented correctly.
See ReactiveX#19 (comment) for some known open questions that we could use help answering.
- Issue 19 Schedulers
Version 0.6.3 (Maven Central)
- Pull 224 RxJavaObservableExecutionHook
Version 0.6.2 (Maven Central)
- Issue 101 Operator: Where (alias to filter)
- Pull 197 TakeWhile observables do not properly complete
- Issue 21 Operator: All
- Pull 206 Observable.toList breaks with multiple subscribers
- Issue 29 Operator: CombineLatest
- Issue 211 Remove use of JSR 305 and dependency on com.google.code.findbugs
- Pull 212 Operation take leaks errors
- Pull 220 TakeWhile protect calls to predicate
- Pull 221 Error Handling Improvements - User Provided Observers/Functions
- Pull 201 Synchronize Observer on OperationMerge
- Issue 43 Operator: Finally
Version 0.6.1 (Maven Central)
- Pull 190 Fix generics issue with materialize() that prevented chaining
Version 0.6.0 (Maven Central)
- Issue 154 Add OSGi manifest headers
- Issue 173 Subscription Utilities and Default Implementations
- Pull 184 Convert 'last' from non-blocking to blocking to match Rx.Net (see Issue 57)
NOTE: This is a version bump from 0.5 to 0.6 because Issue 173 and Pull 184 include breaking changes.
These changes are being done in the goal of matching the Rx.Net implementation so breaking changes will be made prior to 1.0 on 0.x releases if necessary.
It was found that the last()
operator was implemented incorrectly (non-blocking instead of blocking) so any use of last()
on version 0.5.x should be changed to use takeLast(1)
. Since the return type needed to change this could not be done via a deprecation.
Also removed were the Observable.createSubscription
/Observable.noOpSubscription
methods which are now on the rx.subscriptions.Subscriptions utility class as Subscriptions.create
/Subscriptions.empty
. These methods could have been deprecated rather than removed but since another breaking change was being done they were just cleanly changed as part of the pre-1.0 process.
Version 0.5.5 (Maven Central)
- Issue 35 Operator: Defer
- Issue 37 Operator: Dematerialize
- Issue 50 Operator: GetEnumerator (GetIterator)
- Issue 64 Operator: MostRecent
- Issue 86 Operator: TakeUntil
Version 0.5.4 (Maven Central)
- Issue 18 Operator: ToIterable
- Issue 58 Operator: LastOrDefault
- Issue 66 Operator: Next
- Issue 77 Operator: Single and SingleOrDefault
- Issue 164 Range.createWithCount bugfix
- Pull 161 Build Status Badges and CI Integration
Version 0.5.3 (Maven Central)
- Issue 45 Operator: ForEach
- Issue 87 Operator: TakeWhile
- Pull 145 IntelliJ IDEA support in Gradle build
Version 0.5.2 (Maven Central)
- Issue 68 Operator: Range
- Issue 76 Operator: SequenceEqual
- Issue 85 Operator: TakeLast
- Issue 139 Plugin System
- Issue 141 Error Handler Plugin
- Pull 134 VideoExample in Clojure
- Pull 135 Idiomatic usage of import in ns macro in rx-examples.
- Pull 136 Add examples for jbundler and sbt
Version 0.5.1 (Maven Central)
- variety of code cleanup commits
- Pull 132 Broke rxjava-examples mo