Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 31, 2024
1 parent 1e73a04 commit 5a9765a
Show file tree
Hide file tree
Showing 43 changed files with 1,228 additions and 1,100 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ThreadPool {
/// but is not running yet "cancellable".
pub fn run_cpu_intensive<F, R>(
&self,
cpu_heavy_task: F,
cpu_intensive_fn: F,
) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
Expand All @@ -103,7 +103,7 @@ impl ThreadPool {
let _guard = span.enter();
let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks);
ongoing_task_guard.add(1i64);
let result = cpu_heavy_task();
let result = cpu_intensive_fn();
let _ = tx.send(result);
});
rx.map_err(|_| Panicked)
Expand All @@ -118,7 +118,7 @@ impl ThreadPool {
///
/// Disclaimer: The function will no be executed if the Future is dropped.
#[must_use = "run_cpu_intensive will not run if the future it returns is dropped"]
pub fn run_cpu_intensive<F, R>(cpu_heavy_task: F) -> impl Future<Output = Result<R, Panicked>>
pub fn run_cpu_intensive<F, R>(cpu_intensive_fn: F) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -129,7 +129,7 @@ where
let num_threads: usize = (crate::num_cpus() / 3).max(2);
ThreadPool::new("small_tasks", Some(num_threads))
})
.run_cpu_intensive(cpu_heavy_task)
.run_cpu_intensive(cpu_intensive_fn)
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub struct DocProcessorCounters {
/// into 4 categories:
/// - number of docs that could not be parsed.
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of docs for which the doc mapper returned an error.
/// - number of valid docs.
pub num_doc_parse_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ mod tests {
ShardState::Open,
Position::Beginning,
Position::Beginning,
None,
Instant::now(),
);
state_guard.shards.insert(queue_id_01.clone(), shard);
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ mod tests {
ShardState::Open,
Position::Beginning,
Position::Beginning,
None,
now - idle_shard_timeout,
);
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
Expand All @@ -113,6 +114,7 @@ mod tests {
ShardState::Open,
Position::Beginning,
Position::Beginning,
None,
now - idle_shard_timeout / 2,
);
let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2));
Expand Down
Loading

0 comments on commit 5a9765a

Please sign in to comment.