Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Cleaned physical-plan Comments #13055

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use futures::StreamExt;
/// discards the results, and then prints out an annotated plan with metrics
#[derive(Debug, Clone)]
pub struct AnalyzeExec {
/// control how much extra to print
/// Control how much extra to print
verbose: bool,
/// if statistics should be displayed
/// If statistics should be displayed
show_statistics: bool,
/// The input plan (the plan being analyzed)
pub(crate) input: Arc<dyn ExecutionPlan>,
Expand All @@ -69,12 +69,12 @@ impl AnalyzeExec {
}
}

/// access to verbose
/// Access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}

/// access to show_statistics
/// Access to show_statistics
pub fn show_statistics(&self) -> bool {
self.show_statistics
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use futures::stream::{Stream, StreamExt};
pub struct CoalesceBatchesExec {
/// The input plan
input: Arc<dyn ExecutionPlan>,
/// Minimum number of rows for coalesces batches
/// Minimum number of rows for coalescing batches
target_batch_size: usize,
/// Maximum number of rows to fetch, `None` means fetching all rows
fetch: Option<usize>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ mod tests {

let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
let refs = blocking_exec.refs();
let coaelesce_partitions_exec =
let coalesce_partitions_exec =
Arc::new(CoalescePartitionsExec::new(blocking_exec));

let fut = collect(coaelesce_partitions_exec, task_ctx);
let fut = collect(coalesce_partitions_exec, task_ctx);
let mut fut = fut.boxed();

assert_is_pending(&mut fut);
Expand Down
14 changes: 7 additions & 7 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub(crate) fn spawn_buffered(
builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
// receiver dropped when query is shutdown early (e.g., limit) or error,
// Receiver dropped when query is shutdown early (e.g., limit) or error,
// no need to return propagate the send error.
return Ok(());
}
Expand Down Expand Up @@ -182,15 +182,15 @@ pub fn compute_record_batch_statistics(

/// Write in Arrow IPC format.
pub struct IPCWriter {
/// path
/// Path
pub path: PathBuf,
/// inner writer
/// Inner writer
pub writer: FileWriter<File>,
/// batches written
/// Batches written
pub num_batches: usize,
/// rows written
/// Rows written
pub num_rows: usize,
/// bytes written
/// Bytes written
pub num_bytes: usize,
}

Expand Down Expand Up @@ -315,7 +315,7 @@ mod tests {
],
)?;

// just select f32,f64
// Just select f32,f64
let select_projection = Some(vec![0, 1]);
let byte_size = batch
.project(&select_projection.clone().unwrap())
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
}
}

/// Enum representing the different levels of metrics to display
#[derive(Debug, Clone, Copy)]
enum ShowMetrics {
/// Do not show any metrics
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ mod tests {
let empty = EmptyExec::new(Arc::clone(&schema));
assert_eq!(empty.schema(), schema);

// we should have no results
// We should have no results
let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert!(batches.is_empty());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ExplainExec {
&self.stringified_plans
}

/// access to verbose
/// Access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}
Expand Down Expand Up @@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec {
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
// This is a leaf node and has no children
vec![]
}

Expand Down
18 changes: 9 additions & 9 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl FilterExec {

/// Return new instance of [FilterExec] with the given projection.
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
// check if the projection is valid
// Check if the projection is valid
can_project(&self.schema(), projection.as_ref())?;

let projection = match projection {
Expand Down Expand Up @@ -157,7 +157,7 @@ impl FilterExec {
self.default_selectivity
}

/// projection
/// Projection
pub fn projection(&self) -> Option<&Vec<usize>> {
self.projection.as_ref()
}
Expand Down Expand Up @@ -255,9 +255,9 @@ impl FilterExec {
let expr = Arc::new(column) as _;
ConstExpr::new(expr).with_across_partitions(true)
});
// this is for statistics
// This is for statistics
eq_properties = eq_properties.with_constants(constants);
// this is for logical constant (for example: a = '1', then a could be marked as a constant)
// This is for logical constant (for example: a = '1', then a could be marked as a constant)
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
eq_properties =
eq_properties.with_constants(Self::extend_constants(input, predicate));
Expand Down Expand Up @@ -331,7 +331,7 @@ impl ExecutionPlan for FilterExec {
}

fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
// Tell optimizer this operator doesn't reorder its input
vec![true]
}

Expand Down Expand Up @@ -425,7 +425,7 @@ struct FilterExecStream {
predicate: Arc<dyn PhysicalExpr>,
/// The input partition to filter.
input: SendableRecordBatchStream,
/// runtime metrics recording
/// Runtime metrics recording
baseline_metrics: BaselineMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
Expand All @@ -449,7 +449,7 @@ fn filter_and_project(
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match (as_boolean_array(&array), projection) {
// apply filter array to record batch
// Apply filter array to record batch
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
Expand Down Expand Up @@ -490,7 +490,7 @@ impl Stream for FilterExecStream {
&self.schema,
)?;
timer.done();
// skip entirely filtered batches
// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
Expand All @@ -507,7 +507,7 @@ impl Stream for FilterExecStream {
}

fn size_hint(&self) -> (usize, Option<usize>) {
// same number of record batches
// Same number of record batches
self.input.size_hint()
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fn make_count_batch(count: u64) -> RecordBatch {
}

fn make_count_schema() -> SchemaRef {
// define a schema.
// Define a schema.
Arc::new(Schema::new(vec![Field::new(
"count",
DataType::UInt64,
Expand Down
34 changes: 17 additions & 17 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl LimitStream {
if batch.num_rows() > 0 {
break poll;
} else {
// continue to poll input stream
// Continue to poll input stream
}
}
Poll::Ready(Some(Err(_e))) => break poll,
Expand All @@ -408,12 +408,12 @@ impl LimitStream {
}
}

/// fetches from the batch
/// Fetches from the batch
fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
if self.fetch == 0 {
self.input = None; // clear input so it can be dropped early
self.input = None; // Clear input so it can be dropped early
None
} else if batch.num_rows() < self.fetch {
//
Expand All @@ -422,7 +422,7 @@ impl LimitStream {
} else if batch.num_rows() >= self.fetch {
let batch_rows = self.fetch;
self.fetch = 0;
self.input = None; // clear input so it can be dropped early
self.input = None; // Clear input so it can be dropped early

// It is guaranteed that batch_rows is <= batch.num_rows
Some(batch.slice(0, batch_rows))
Expand Down Expand Up @@ -453,7 +453,7 @@ impl Stream for LimitStream {
other => other,
})
}
// input has been cleared
// Input has been cleared
None => Poll::Ready(None),
};

Expand Down Expand Up @@ -489,17 +489,17 @@ mod tests {
let num_partitions = 4;
let csv = test::scan_partitioned(num_partitions);

// input should have 4 partitions
// Input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7));

// the result should contain 4 batches (one per input partition)
// The result should contain 4 batches (one per input partition)
let iter = limit.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// there should be a total of 100 rows
// There should be a total of 100 rows
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 7);

Expand All @@ -520,7 +520,7 @@ mod tests {
let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// Limit of six needs to consume the entire first record batch
// (5 rows) and 1 row from the second (1 row)
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
Expand Down Expand Up @@ -550,7 +550,7 @@ mod tests {
let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// Limit of six needs to consume the entire first record batch
// (6 rows) and stop immediately
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
Expand Down Expand Up @@ -580,7 +580,7 @@ mod tests {
let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// Limit of six needs to consume the entire first record batch
// (6 rows) and stop immediately
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
Expand All @@ -598,7 +598,7 @@ mod tests {
Ok(())
}

// test cases for "skip"
// Test cases for "skip"
async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
let task_ctx = Arc::new(TaskContext::default());

Expand All @@ -611,7 +611,7 @@ mod tests {
let offset =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);

// the result should contain 4 batches (one per input partition)
// The result should contain 4 batches (one per input partition)
let iter = offset.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
Ok(batches.iter().map(|batch| batch.num_rows()).sum())
Expand All @@ -633,15 +633,15 @@ mod tests {

#[tokio::test]
async fn skip_3_fetch_none() -> Result<()> {
// there are total of 400 rows, we skipped 3 rows (offset = 3)
// There are total of 400 rows, we skipped 3 rows (offset = 3)
let row_count = skip_and_fetch(3, None).await?;
assert_eq!(row_count, 397);
Ok(())
}

#[tokio::test]
async fn skip_3_fetch_10_stats() -> Result<()> {
// there are total of 100 rows, we skipped 3 rows (offset = 3)
// There are total of 100 rows, we skipped 3 rows (offset = 3)
let row_count = skip_and_fetch(3, Some(10)).await?;
assert_eq!(row_count, 10);
Ok(())
Expand All @@ -656,15 +656,15 @@ mod tests {

#[tokio::test]
async fn skip_400_fetch_1() -> Result<()> {
// there are a total of 400 rows
// There are a total of 400 rows
let row_count = skip_and_fetch(400, Some(1)).await?;
assert_eq!(row_count, 0);
Ok(())
}

#[tokio::test]
async fn skip_401_fetch_none() -> Result<()> {
// there are total of 400 rows, we skipped 401 rows (offset = 3)
// There are total of 400 rows, we skipped 401 rows (offset = 3)
let row_count = skip_and_fetch(401, None).await?;
assert_eq!(row_count, 0);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl ExecutionPlan for MemoryExec {
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
// This is a leaf node and has no children
vec![]
}

Expand Down Expand Up @@ -179,7 +179,7 @@ impl MemoryExec {
})
}

/// set `show_sizes` to determine whether to display partition sizes
/// Set `show_sizes` to determine whether to display partition sizes
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
self
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/placeholder_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod tests {
let schema = test::aggr_test_schema();
let placeholder = PlaceholderRowExec::new(schema);

// ask for the wrong partition
// Ask for the wrong partition
assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
assert!(placeholder.execute(20, task_ctx).is_err());
Ok(())
Expand All @@ -223,7 +223,7 @@ mod tests {
let iter = placeholder.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
// Should have one item
assert_eq!(batches.len(), 1);

Ok(())
Expand All @@ -240,7 +240,7 @@ mod tests {
let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
let batches = common::collect(iter).await?;

// should have one item
// Should have one item
assert_eq!(batches.len(), 1);
}

Expand Down
Loading