-
Notifications
You must be signed in to change notification settings - Fork 170
/
Copy pathmaybe_changed_after.rs
418 lines (387 loc) · 18.6 KB
/
maybe_changed_after.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
cycle::{CycleHeads, CycleRecoveryStrategy},
function::sync::ClaimResult,
key::DatabaseKeyIndex,
zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase},
zalsa_local::{ActiveQueryGuard, QueryEdge, QueryOrigin},
AsDynDatabase as _, Id, Revision,
};
use std::sync::atomic::Ordering;
use super::{memo::Memo, Configuration, IngredientImpl};
/// Result of memo validation.
pub enum VerifyResult {
/// Memo has changed and needs to be recomputed.
Changed,
/// Memo remains valid.
///
/// The first inner value tracks whether the memo or any of its dependencies have an
/// accumulated value.
///
/// The second is the cycle heads encountered in validation; don't mark
/// memos verified until we've iterated the full cycle to ensure no inputs changed.
Unchanged(InputAccumulatedValues, CycleHeads),
}
impl VerifyResult {
pub(crate) fn changed_if(changed: bool) -> Self {
if changed {
Self::Changed
} else {
Self::unchanged()
}
}
pub(crate) fn unchanged() -> Self {
Self::Unchanged(InputAccumulatedValues::Empty, CycleHeads::default())
}
}
impl<C> IngredientImpl<C>
where
C: Configuration,
{
pub(super) fn maybe_changed_after<'db>(
&'db self,
db: &'db C::DbView,
id: Id,
revision: Revision,
) -> VerifyResult {
let zalsa = db.zalsa();
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
zalsa.unwind_if_revision_cancelled(db);
loop {
let database_key_index = self.database_key_index(id);
tracing::debug!("{database_key_index:?}: maybe_changed_after(revision = {revision:?})");
// Check if we have a verified version: this is the hot path.
let memo_guard = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
if let Some(memo) = memo_guard {
if self.validate_may_be_provisional(db, zalsa, database_key_index, memo)
&& self.shallow_verify_memo(db, zalsa, database_key_index, memo)
{
return if memo.revisions.changed_at > revision {
VerifyResult::Changed
} else {
VerifyResult::Unchanged(
memo.revisions.accumulated_inputs.load(),
CycleHeads::default(),
)
};
}
if let Some(mcs) =
self.maybe_changed_after_cold(zalsa, db, id, revision, memo_ingredient_index)
{
return mcs;
} else {
// We failed to claim, have to retry.
}
} else {
// No memo? Assume has changed.
return VerifyResult::Changed;
}
}
}
fn maybe_changed_after_cold<'db>(
&'db self,
zalsa: &Zalsa,
db: &'db C::DbView,
key_index: Id,
revision: Revision,
memo_ingredient_index: MemoIngredientIndex,
) -> Option<VerifyResult> {
let database_key_index = self.database_key_index(key_index);
let _claim_guard = match self.sync_table.try_claim(db, zalsa, key_index) {
ClaimResult::Retry => return None,
ClaimResult::Cycle => match C::CYCLE_STRATEGY {
CycleRecoveryStrategy::Panic => panic!(
"dependency graph cycle validating {database_key_index:#?}; \
set cycle_fn/cycle_initial to fixpoint iterate"
),
CycleRecoveryStrategy::Fixpoint => {
return Some(VerifyResult::Unchanged(
InputAccumulatedValues::Empty,
CycleHeads::from(database_key_index),
));
}
},
ClaimResult::Claimed(guard) => guard,
};
// Load the current memo, if any.
let Some(old_memo) = self.get_memo_from_table_for(zalsa, key_index, memo_ingredient_index)
else {
return Some(VerifyResult::Changed);
};
tracing::debug!(
"{database_key_index:?}: maybe_changed_after_cold, successful claim, \
revision = {revision:?}, old_memo = {old_memo:#?}",
old_memo = old_memo.tracing_debug()
);
// Check if the inputs are still valid. We can just compare `changed_at`.
let active_query = db.zalsa_local().push_query(database_key_index);
if let VerifyResult::Unchanged(_, cycle_heads) =
self.deep_verify_memo(db, zalsa, old_memo, &active_query)
{
return Some(if old_memo.revisions.changed_at > revision {
VerifyResult::Changed
} else {
VerifyResult::Unchanged(old_memo.revisions.accumulated_inputs.load(), cycle_heads)
});
}
// If inputs have changed, but we have an old value, we can re-execute.
// It is possible the result will be equal to the old value and hence
// backdated. In that case, although we will have computed a new memo,
// the value has not logically changed.
if old_memo.value.is_some() {
let memo = self.execute(db, active_query, Some(old_memo));
let changed_at = memo.revisions.changed_at;
return Some(if changed_at > revision {
VerifyResult::Changed
} else {
VerifyResult::Unchanged(
match &memo.revisions.accumulated {
Some(_) => InputAccumulatedValues::Any,
None => memo.revisions.accumulated_inputs.load(),
},
CycleHeads::default(),
)
});
}
// Otherwise, nothing for it: have to consider the value to have changed.
Some(VerifyResult::Changed)
}
/// True if the memo's value and `changed_at` time is still valid in this revision.
/// Does only a shallow O(1) check, doesn't walk the dependencies.
///
/// In general, a provisional memo (from cycle iteration) does not verify. Since we don't
/// eagerly finalize all provisional memos in cycle iteration, we have to lazily check here
/// (via `validate_provisional`) whether a may-be-provisional memo should actually be verified
/// final, because its cycle heads are all now final.
#[inline]
pub(super) fn shallow_verify_memo(
&self,
db: &C::DbView,
zalsa: &Zalsa,
database_key_index: DatabaseKeyIndex,
memo: &Memo<C::Output<'_>>,
) -> bool {
tracing::debug!(
"{database_key_index:?}: shallow_verify_memo(memo = {memo:#?})",
memo = memo.tracing_debug()
);
let verified_at = memo.verified_at.load();
let revision_now = zalsa.current_revision();
if verified_at == revision_now {
// Already verified.
return true;
}
let last_changed = zalsa.last_changed_revision(memo.revisions.durability);
tracing::debug!(
"{database_key_index:?}: check_durability(memo = {memo:#?}, last_changed={:?} <= verified_at={:?}) = {:?}",
last_changed,
verified_at,
last_changed <= verified_at,
memo = memo.tracing_debug()
);
if last_changed <= verified_at {
// No input of the suitable durability has changed since last verified.
memo.mark_as_verified(
db,
revision_now,
database_key_index,
memo.revisions.accumulated_inputs.load(),
);
memo.mark_outputs_as_verified(zalsa, db.as_dyn_database(), database_key_index);
return true;
}
false
}
/// Validates this memo if it is a provisional memo. Returns true for non provisional memos or
/// if the provisional memo has been successfully marked as verified final, that is, its
/// cycle heads have all been finalized.
#[inline]
pub(super) fn validate_may_be_provisional(
&self,
db: &C::DbView,
zalsa: &Zalsa,
database_key_index: DatabaseKeyIndex,
memo: &Memo<C::Output<'_>>,
) -> bool {
// Wouldn't it be nice if rust had an implication operator ...
// may_be_provisional -> validate_provisional
!memo.may_be_provisional() || self.validate_provisional(db, zalsa, database_key_index, memo)
}
/// Check if this memo's cycle heads have all been finalized. If so, mark it verified final and
/// return true, if not return false.
#[inline]
fn validate_provisional(
&self,
db: &C::DbView,
zalsa: &Zalsa,
database_key_index: DatabaseKeyIndex,
memo: &Memo<C::Output<'_>>,
) -> bool {
tracing::debug!(
"{database_key_index:?}: validate_provisional(memo = {memo:#?})",
memo = memo.tracing_debug()
);
if (&memo.revisions.cycle_heads).into_iter().any(|cycle_head| {
zalsa
.lookup_ingredient(cycle_head.ingredient_index())
.is_provisional_cycle_head(db.as_dyn_database(), cycle_head.key_index())
}) {
return false;
}
// Relaxed is sufficient here because there are no other writes we need to ensure have
// happened before marking this memo as verified-final.
memo.revisions.verified_final.store(true, Ordering::Relaxed);
true
}
/// VerifyResult::Unchanged if the memo's value and `changed_at` time is up-to-date in the
/// current revision. When this returns Unchanged with no cycle heads, it also updates the
/// memo's `verified_at` field if needed to make future calls cheaper.
///
/// Takes an [`ActiveQueryGuard`] argument because this function recursively
/// walks dependencies of `old_memo` and may even execute them to see if their
/// outputs have changed.
pub(super) fn deep_verify_memo(
&self,
db: &C::DbView,
zalsa: &Zalsa,
old_memo: &Memo<C::Output<'_>>,
active_query: &ActiveQueryGuard<'_>,
) -> VerifyResult {
let database_key_index = active_query.database_key_index;
tracing::debug!(
"{database_key_index:?}: deep_verify_memo(old_memo = {old_memo:#?})",
old_memo = old_memo.tracing_debug()
);
if self.validate_may_be_provisional(db, zalsa, database_key_index, old_memo)
&& self.shallow_verify_memo(db, zalsa, database_key_index, old_memo)
{
return VerifyResult::unchanged();
}
match &old_memo.revisions.origin {
QueryOrigin::Assigned(_) => {
// If the value was assigned by another query,
// and that query were up-to-date,
// then we would have updated the `verified_at` field already.
// So the fact that we are here means that it was not specified
// during this revision or is otherwise stale.
//
// Example of how this can happen:
//
// Conditionally specified queries
// where the value is specified
// in rev 1 but not in rev 2.
VerifyResult::Changed
}
QueryOrigin::FixpointInitial if old_memo.may_be_provisional() => VerifyResult::Changed,
QueryOrigin::FixpointInitial => VerifyResult::unchanged(),
QueryOrigin::DerivedUntracked(_) => {
// Untracked inputs? Have to assume that it changed.
VerifyResult::Changed
}
QueryOrigin::Derived(edges) => {
if old_memo.may_be_provisional() {
return VerifyResult::Changed;
}
let mut cycle_heads = vec![];
'cycle: loop {
// Fully tracked inputs? Iterate over the inputs and check them, one by one.
//
// NB: It's important here that we are iterating the inputs in the order that
// they executed. It's possible that if the value of some input I0 is no longer
// valid, then some later input I1 might never have executed at all, so verifying
// it is still up to date is meaningless.
let last_verified_at = old_memo.verified_at.load();
let mut inputs = InputAccumulatedValues::Empty;
let dyn_db = db.as_dyn_database();
for &edge in edges.input_outputs.iter() {
match edge {
QueryEdge::Input(dependency_index) => {
match dependency_index.maybe_changed_after(dyn_db, last_verified_at)
{
VerifyResult::Changed => break 'cycle VerifyResult::Changed,
VerifyResult::Unchanged(input_accumulated, cycles) => {
cycles.insert_into(&mut cycle_heads);
inputs |= input_accumulated;
}
}
}
QueryEdge::Output(dependency_index) => {
// Subtle: Mark outputs as validated now, even though we may
// later find an input that requires us to re-execute the function.
// Even if it re-execute, the function will wind up writing the same value,
// since all prior inputs were green. It's important to do this during
// this loop, because it's possible that one of our input queries will
// re-execute and may read one of our earlier outputs
// (e.g., in a scenario where we do something like
// `e = Entity::new(..); query(e);` and `query` reads a field of `e`).
//
// NB. Accumulators are also outputs, but the above logic doesn't
// quite apply to them. Since multiple values are pushed, the first value
// may be unchanged, but later values could be different.
// In that case, however, the data accumulated
// by this function cannot be read until this function is marked green,
// so even if we mark them as valid here, the function will re-execute
// and overwrite the contents.
dependency_index.mark_validated_output(
zalsa,
dyn_db,
database_key_index,
);
}
}
}
// Possible scenarios here:
//
// 1. Cycle heads is empty. We traversed our full dependency graph and neither hit any
// cycles, nor found any changed dependencies. We can mark our memo verified and
// return Unchanged with empty cycle heads.
//
// 2. Cycle heads is non-empty, and does not contain our own key index. We are part of
// a cycle, and since we don't know if some other cycle participant that hasn't been
// traversed yet (that is, some other dependency of the cycle head, which is only a
// dependency of ours via the cycle) might still have changed, we can't yet mark our
// memo verified. We can return a provisional Unchanged, with cycle heads.
//
// 3. Cycle heads is non-empty, and contains only our own key index. We are the head of
// a cycle, and we've now traversed the entire cycle and found no changes, but no
// other cycle participants were verified (they would have all hit case 2 above). We
// can now safely mark our own memo as verified. Then we have to traverse the entire
// cycle again. This time, since our own memo is verified, there will be no cycle
// encountered, and the rest of the cycle will be able to verify itself.
//
// 4. Cycle heads is non-empty, and contains our own key index as well as other key
// indices. We are the head of a cycle nested within another cycle. We can't mark
// our own memo verified (for the same reason as in case 2: the full outer cycle
// hasn't been validated unchanged yet). We return Unchanged, with ourself removed
// from cycle heads. We will handle our own memo (and the rest of our cycle) on a
// future iteration; first the outer cycle head needs to verify itself.
let in_heads = cycle_heads
.iter()
.position(|&head| head == database_key_index)
.inspect(|&head| _ = cycle_heads.swap_remove(head))
.is_some();
if cycle_heads.is_empty() {
old_memo.mark_as_verified(
db,
zalsa.current_revision(),
database_key_index,
inputs,
);
if in_heads {
// Iterate our dependency graph again, starting from the top. We clear the
// cycle heads here because we are starting a fresh traversal. (It might be
// logically clearer to create a new HashSet each time, but clearing the
// existing one is more efficient.)
cycle_heads.clear();
continue 'cycle;
}
}
break 'cycle VerifyResult::Unchanged(
InputAccumulatedValues::Empty,
CycleHeads::from(cycle_heads),
);
}
}
}
}
}