Skip to content

Commit

Permalink
[data ingestion] avoid ls in collocated setup (#19960)
Browse files Browse the repository at this point in the history
## Description 

Avoid listing all files in the local directory, as this can increase CPU
usage in some setups

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Oct 22, 2024
1 parent 5c6968b commit aca7269
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions crates/sui-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,19 @@ impl CheckpointReader {
/// Represents a single iteration of the reader.
/// Reads files in a local directory, validates them, and forwards `CheckpointData` to the executor.
async fn read_local_files(&self) -> Result<Vec<Arc<CheckpointData>>> {
let mut files = vec![];
for entry in fs::read_dir(self.path.clone())? {
let entry = entry?;
let filename = entry.file_name();
if let Some(sequence_number) = Self::checkpoint_number_from_file_path(&filename) {
if sequence_number >= self.current_checkpoint_number {
files.push((sequence_number, entry.path()));
}
}
}
files.sort();
debug!("unprocessed local files {:?}", files);
let mut checkpoints = vec![];
for (_, filename) in files.iter().take(MAX_CHECKPOINTS_IN_PROGRESS) {
let checkpoint = Blob::from_bytes::<Arc<CheckpointData>>(&fs::read(filename)?)?;
if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
for offset in 0..MAX_CHECKPOINTS_IN_PROGRESS {
let sequence_number = self.current_checkpoint_number + offset as u64;
if self.exceeds_capacity(sequence_number) {
break;
}
checkpoints.push(checkpoint);
match fs::read(self.path.join(format!("{}.chk", sequence_number))) {
Ok(bytes) => checkpoints.push(Blob::from_bytes::<Arc<CheckpointData>>(&bytes)?),
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => break,
_ => Err(err)?,
},
}
}
Ok(checkpoints)
}
Expand Down

0 comments on commit aca7269

Please sign in to comment.