-
Notifications
You must be signed in to change notification settings - Fork 1
/
file1.dart
66 lines (59 loc) · 1.64 KB
/
file1.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import 'dart:async';
void main() async {
Stream<String> getNames() async* {
yield "A";
await Future.delayed(Duration(seconds: 1));
yield "B";
await Future.delayed(Duration(seconds: 2));
yield "C";
await Future.delayed(Duration(seconds: 1));
yield "D";
await Future.delayed(Duration(seconds: 4));
yield "E";
await Future.delayed(Duration(seconds: 7));
yield "F";
}
try {
await for (var name in getNames().streamTimeWait(Duration(seconds: 3))) {
print(name);
}
} on TimerRanoutException catch (e) {
print(e.message.toString());
}
}
class TimeOutBeetweenEvents<T> extends StreamTransformerBase<T, T> {
Duration duration;
StreamController<T>? controller;
StreamSubscription<T>? subscription;
Timer? timer;
TimeOutBeetweenEvents(this.duration);
@override
Stream<T> bind(Stream<T> stream) {
controller = StreamController<T>(
onListen: () {
subscription = stream.listen((data) {
timer?.cancel();
timer = Timer.periodic(duration, (timer) {
controller?.addError(TimerRanoutException(
"Timer ran out",
));
});
controller?.add(data);
}, onError: controller?.addError, onDone: controller?.close);
},
onCancel: () {
subscription?.cancel();
timer?.cancel();
},
);
return controller!.stream;
}
}
extension TimerBetweeenEvents<T> on Stream<T> {
Stream<T> streamTimeWait(Duration duration) =>
transform(TimeOutBeetweenEvents(duration));
}
class TimerRanoutException implements Exception {
String message;
TimerRanoutException(this.message);
}