Skip to content

Commit

Permalink
fix: use number of partitions as parallilism in region scanner (#4669)
Browse files Browse the repository at this point in the history
* fix: use number of partitions as parallilism in region scanner

Signed-off-by: Ruihang Xia <[email protected]>

* add sqlness

Signed-off-by: Ruihang Xia <[email protected]>

Co-authored-by: Lei HUANG <[email protected]>

* order by ts

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Lei HUANG <[email protected]>
  • Loading branch information
waynexia and v0y4g3r authored Sep 3, 2024
1 parent 93f2026 commit 66cb403
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 12 deletions.
19 changes: 12 additions & 7 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl SeqScan {
self.semaphore.clone(),
&mut metrics,
self.compaction,
self.properties.num_partitions(),
)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
Expand Down Expand Up @@ -184,10 +185,11 @@ impl SeqScan {
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
parallelism: usize,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?;
let parts_len = parts.0.len();

let mut sources = Vec::with_capacity(parts_len);
Expand All @@ -211,11 +213,12 @@ impl SeqScan {
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
parallelism: usize,
) -> Result<Option<BoxedBatchReader>> {
let mut sources = Vec::new();
let build_start = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?;

let Some(part) = parts.0.get_part(range_id) else {
return Ok(None);
Expand Down Expand Up @@ -311,12 +314,13 @@ impl SeqScan {
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction)
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction, parallelism)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Expand Down Expand Up @@ -390,6 +394,7 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let compaction = self.compaction;
let parallelism = self.properties.num_partitions();

// build stream
let stream = try_stream! {
Expand All @@ -398,7 +403,7 @@ impl SeqScan {
// init parts
let parts_len = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism).await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
parts.0.len()
Expand All @@ -411,6 +416,7 @@ impl SeqScan {
semaphore.clone(),
&mut metrics,
compaction,
parallelism
)
.await
.map_err(BoxedError::new)
Expand Down Expand Up @@ -467,6 +473,7 @@ impl SeqScan {
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
parallelism: usize,
) -> Result<()> {
if part_list.0.is_none() {
let now = Instant::now();
Expand All @@ -477,9 +484,7 @@ impl SeqScan {
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list
.0
.set_parts(distributor.build_parts(input.parallelism.parallelism));
part_list.0.set_parts(distributor.build_parts(parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

Expand Down
11 changes: 6 additions & 5 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));

Self {
Expand Down Expand Up @@ -148,12 +149,13 @@ impl RegionScanner for UnorderedScan {
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

let part = {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics)
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Expand Down Expand Up @@ -260,6 +262,7 @@ async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
parallelism: usize,
) -> Result<()> {
if part_list.0.is_none() {
let now = Instant::now();
Expand All @@ -270,9 +273,7 @@ async fn maybe_init_parts(
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list
.0
.set_parts(distributor.build_parts(input.parallelism.parallelism));
part_list.0.set_parts(distributor.build_parts(parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

Expand Down
108 changes: 108 additions & 0 deletions tests/cases/standalone/common/select/flush_append_only.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
create table
t (
ts timestamp time index,
host string primary key,
not_pk string,
val double,
)
with
(
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_active_window_files' = '8',
'compaction.twcs.max_inactive_window_files' = '8'
);

Affected Rows: 0

insert into
t
values
(0, 'a', '🌕', 1.0),
(1, 'b', '🌖', 2.0),
(1, 'a', '🌗', 3.0),
(1, 'c', '🌘', 4.0),
(2, 'a', '🌑', 5.0),
(2, 'b', '🌒', 6.0),
(2, 'a', '🌓', 7.0),
(3, 'c', '🌔', 8.0),
(3, 'd', '🌕', 9.0);

Affected Rows: 9

admin flush_table ('t');

+------------------------+
| ADMIN flush_table('t') |
+------------------------+
| 0 |
+------------------------+

insert into
t
values
(10, 'a', '🌕', 1.0),
(11, 'b', '🌖', 2.0),
(11, 'a', '🌗', 3.0),
(11, 'c', '🌘', 4.0),
(12, 'a', '🌑', 5.0),
(12, 'b', '🌒', 6.0),
(12, 'a', '🌓', 7.0),
(13, 'c', '🌔', 8.0),
(13, 'd', '🌕', 9.0);

Affected Rows: 9

admin flush_table ('t');

+------------------------+
| ADMIN flush_table('t') |
+------------------------+
| 0 |
+------------------------+

select
count(ts)
from
t;

+-------------+
| COUNT(t.ts) |
+-------------+
| 18 |
+-------------+

select
ts
from
t
order by
ts;

+-------------------------+
| ts |
+-------------------------+
| 1970-01-01T00:00:00 |
| 1970-01-01T00:00:00.001 |
| 1970-01-01T00:00:00.001 |
| 1970-01-01T00:00:00.001 |
| 1970-01-01T00:00:00.002 |
| 1970-01-01T00:00:00.002 |
| 1970-01-01T00:00:00.002 |
| 1970-01-01T00:00:00.003 |
| 1970-01-01T00:00:00.003 |
| 1970-01-01T00:00:00.010 |
| 1970-01-01T00:00:00.011 |
| 1970-01-01T00:00:00.011 |
| 1970-01-01T00:00:00.011 |
| 1970-01-01T00:00:00.012 |
| 1970-01-01T00:00:00.012 |
| 1970-01-01T00:00:00.012 |
| 1970-01-01T00:00:00.013 |
| 1970-01-01T00:00:00.013 |
+-------------------------+

drop table t;

Affected Rows: 0

58 changes: 58 additions & 0 deletions tests/cases/standalone/common/select/flush_append_only.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
create table
t (
ts timestamp time index,
host string primary key,
not_pk string,
val double,
)
with
(
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_active_window_files' = '8',
'compaction.twcs.max_inactive_window_files' = '8'
);

insert into
t
values
(0, 'a', '🌕', 1.0),
(1, 'b', '🌖', 2.0),
(1, 'a', '🌗', 3.0),
(1, 'c', '🌘', 4.0),
(2, 'a', '🌑', 5.0),
(2, 'b', '🌒', 6.0),
(2, 'a', '🌓', 7.0),
(3, 'c', '🌔', 8.0),
(3, 'd', '🌕', 9.0);

admin flush_table ('t');

insert into
t
values
(10, 'a', '🌕', 1.0),
(11, 'b', '🌖', 2.0),
(11, 'a', '🌗', 3.0),
(11, 'c', '🌘', 4.0),
(12, 'a', '🌑', 5.0),
(12, 'b', '🌒', 6.0),
(12, 'a', '🌓', 7.0),
(13, 'c', '🌔', 8.0),
(13, 'd', '🌕', 9.0);

admin flush_table ('t');

select
count(ts)
from
t;

select
ts
from
t
order by
ts;

drop table t;

0 comments on commit 66cb403

Please sign in to comment.