forked from TimelyDataflow/differential-dataflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrace.rs
65 lines (52 loc) · 2.06 KB
/
trace.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
extern crate timely;
extern crate differential_dataflow;
use std::rc::Rc;
use timely::dataflow::operators::generic::OperatorInfo;
use timely::progress::{Antichain, frontier::AntichainRef};
use differential_dataflow::trace::implementations::ord::OrdValBatch;
use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher};
use differential_dataflow::trace::cursor::Cursor;
use differential_dataflow::trace::implementations::spine_fueled::Spine;
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<K, V, T, R>>>;
type IntegerTrace = OrdValSpine<u64, u64, usize, i64>;
fn get_trace() -> Spine<Rc<OrdValBatch<u64, u64, usize, i64>>> {
let op_info = OperatorInfo::new(0, 0, &[]);
let mut trace = IntegerTrace::new(op_info, None, None);
{
let mut batcher = <<IntegerTrace as TraceReader>::Batch as Batch>::Batcher::new();
batcher.push_batch(&mut vec![
((1, 2), 0, 1),
((2, 3), 1, 1),
((2, 3), 2, -1),
]);
let batch_ts = &[1, 2, 3];
let batches = batch_ts.iter().map(move |i| batcher.seal(Antichain::from_elem(*i)));
for b in batches {
trace.insert(b);
}
}
trace
}
#[test]
fn test_trace() {
let mut trace = get_trace();
let (mut cursor1, storage1) = trace.cursor_through(AntichainRef::new(&[1])).unwrap();
let vec_1 = cursor1.to_vec(&storage1);
assert_eq!(vec_1, vec![((1, 2), vec![(0, 1)])]);
let (mut cursor2, storage2) = trace.cursor_through(AntichainRef::new(&[2])).unwrap();
let vec_2 = cursor2.to_vec(&storage2);
println!("--> {:?}", vec_2);
assert_eq!(vec_2, vec![
((1, 2), vec![(0, 1)]),
((2, 3), vec![(1, 1)]),
]);
let (mut cursor3, storage3) = trace.cursor_through(AntichainRef::new(&[3])).unwrap();
let vec_3 = cursor3.to_vec(&storage3);
assert_eq!(vec_3, vec![
((1, 2), vec![(0, 1)]),
((2, 3), vec![(1, 1), (2, -1)]),
]);
let (mut cursor4, storage4) = trace.cursor();
let vec_4 = cursor4.to_vec(&storage4);
assert_eq!(vec_4, vec_3);
}