-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrepairer.rs
147 lines (133 loc) · 4.99 KB
/
repairer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use crate::actors::{
backends::{BackendManagerActor, RequestBackends, StateInterest},
meta::{MetaStoreActor, ScanMeta},
zstor::{Rebuild, ZstorActor},
};
use actix::prelude::*;
use log::{debug, error, warn};
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
/// Amount of time between starting a new sweep of the backend objects.
const OBJECT_SWEEP_INTERVAL_SECONDS: u64 = 60 * 10;
#[derive(Message)]
#[rtype(result = "()")]
/// Message to request a sweep of all objects in the [`MetaStore`]. If one or more backends are not
/// reachable, the object is repaired.
struct SweepObjects;
/// Actor implementation of a repair queue. It periodically sweeps the [`MetaStore`], and verifies
/// all backends are still reachable.
pub struct RepairActor {
meta: Addr<MetaStoreActor>,
backend_manager: Addr<BackendManagerActor>,
zstor: Addr<ZstorActor>,
}
impl RepairActor {
/// Create a new [`RepairActor`] checking objects in the provided metastore and using the given
/// zstor to repair them if needed.
pub fn new(
meta: Addr<MetaStoreActor>,
backend_manager: Addr<BackendManagerActor>,
zstor: Addr<ZstorActor>,
) -> RepairActor {
Self {
meta,
backend_manager,
zstor,
}
}
/// Send a [`SweepObjects`] command to the actor.
fn sweep_objects(&mut self, ctx: &mut <Self as Actor>::Context) {
ctx.notify(SweepObjects);
}
}
impl Actor for RepairActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(
Duration::from_secs(OBJECT_SWEEP_INTERVAL_SECONDS),
Self::sweep_objects,
);
}
}
impl Handler<SweepObjects> for RepairActor {
type Result = ResponseFuture<()>;
fn handle(&mut self, _: SweepObjects, _: &mut Self::Context) -> Self::Result {
let meta = self.meta.clone();
let backend_manager = self.backend_manager.clone();
let zstor = self.zstor.clone();
Box::pin(async move {
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
// start scanning from the beginning (cursor == None) and let the metastore choose the backend_id
let mut cursor = None;
let mut backend_idx = None;
loop {
// scan keys from the metastore
let (idx, new_cursor, metas) = match meta
.send(ScanMeta {
cursor: cursor.clone(),
backend_idx,
max_timestamp: Some(start_time),
})
.await
{
Err(e) => {
error!("Could not request meta keys from metastore: {}", e);
return;
}
Ok(result) => match result {
Err(e) => {
error!("Could not get meta keys from metastore: {}", e);
return;
}
Ok(res) => res,
},
};
// iterate over the keys and check if the backends are healthy
// if not, rebuild the object
for (key, metadata) in metas.into_iter() {
let backend_requests = metadata
.shards()
.iter()
.map(|shard_info| shard_info.zdb())
.cloned()
.collect::<Vec<_>>();
let backends = match backend_manager
.send(RequestBackends {
backend_requests,
interest: StateInterest::Readable,
})
.await
{
Err(e) => {
error!("Failed to request backends: {}", e);
return;
}
Ok(backends) => backends,
};
let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_))));
if must_rebuild {
if let Err(e) = zstor
.send(Rebuild {
file: None,
key: Some(key),
metadata: Some(metadata),
})
.await
{
warn!("Failed to rebuild data: {}", e);
}
}
}
if new_cursor.is_none() {
debug!("there is no more old data to rebuild");
break;
}
cursor = new_cursor;
backend_idx = Some(idx);
}
})
}
}