Skip to content

Commit

Permalink
Minor: Cleaned physical-plan Comments (#13055)
Browse files Browse the repository at this point in the history
* Fixed documentations

* small fixes
  • Loading branch information
jonathanc-n authored Oct 22, 2024
1 parent cf60da9 commit d3920f3
Show file tree
Hide file tree
Showing 19 changed files with 99 additions and 98 deletions.
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use futures::StreamExt;
/// discards the results, and then prints out an annotated plan with metrics
#[derive(Debug, Clone)]
pub struct AnalyzeExec {
/// control how much extra to print
/// Control how much extra to print
verbose: bool,
/// if statistics should be displayed
/// If statistics should be displayed
show_statistics: bool,
/// The input plan (the plan being analyzed)
pub(crate) input: Arc<dyn ExecutionPlan>,
Expand All @@ -69,12 +69,12 @@ impl AnalyzeExec {
}
}

/// access to verbose
/// Access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}

/// access to show_statistics
/// Access to show_statistics
pub fn show_statistics(&self) -> bool {
self.show_statistics
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use futures::stream::{Stream, StreamExt};
pub struct CoalesceBatchesExec {
/// The input plan
input: Arc<dyn ExecutionPlan>,
/// Minimum number of rows for coalesces batches
/// Minimum number of rows for coalescing batches
target_batch_size: usize,
/// Maximum number of rows to fetch, `None` means fetching all rows
fetch: Option<usize>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ mod tests {

let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
let refs = blocking_exec.refs();
let coaelesce_partitions_exec =
let coalesce_partitions_exec =
Arc::new(CoalescePartitionsExec::new(blocking_exec));

let fut = collect(coaelesce_partitions_exec, task_ctx);
let fut = collect(coalesce_partitions_exec, task_ctx);
let mut fut = fut.boxed();

assert_is_pending(&mut fut);
Expand Down
14 changes: 7 additions & 7 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub(crate) fn spawn_buffered(
builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
// receiver dropped when query is shutdown early (e.g., limit) or error,
// Receiver dropped when query is shutdown early (e.g., limit) or error,
// no need to return propagate the send error.
return Ok(());
}
Expand Down Expand Up @@ -182,15 +182,15 @@ pub fn compute_record_batch_statistics(

/// Write in Arrow IPC format.
pub struct IPCWriter {
/// path
/// Path
pub path: PathBuf,
/// inner writer
/// Inner writer
pub writer: FileWriter<File>,
/// batches written
/// Batches written
pub num_batches: usize,
/// rows written
/// Rows written
pub num_rows: usize,
/// bytes written
/// Bytes written
pub num_bytes: usize,
}

Expand Down Expand Up @@ -315,7 +315,7 @@ mod tests {
],
)?;

// just select f32,f64
// Just select f32,f64
let select_projection = Some(vec![0, 1]);
let byte_size = batch
.project(&select_projection.clone().unwrap())
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
}
}

/// Enum representing the different levels of metrics to display
#[derive(Debug, Clone, Copy)]
enum ShowMetrics {
/// Do not show any metrics
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ mod tests {
let empty = EmptyExec::new(Arc::clone(&schema));
assert_eq!(empty.schema(), schema);

// we should have no results
// We should have no results
let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert!(batches.is_empty());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ExplainExec {
&self.stringified_plans
}

/// access to verbose
/// Access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}
Expand Down Expand Up @@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec {
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
// This is a leaf node and has no children
vec![]
}

Expand Down
18 changes: 9 additions & 9 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl FilterExec {

/// Return new instance of [FilterExec] with the given projection.
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
// check if the projection is valid
// Check if the projection is valid
can_project(&self.schema(), projection.as_ref())?;

let projection = match projection {
Expand Down Expand Up @@ -157,7 +157,7 @@ impl FilterExec {
self.default_selectivity
}

/// projection
/// Projection
pub fn projection(&self) -> Option<&Vec<usize>> {
self.projection.as_ref()
}
Expand Down Expand Up @@ -255,9 +255,9 @@ impl FilterExec {
let expr = Arc::new(column) as _;
ConstExpr::new(expr).with_across_partitions(true)
});
// this is for statistics
// This is for statistics
eq_properties = eq_properties.with_constants(constants);
// this is for logical constant (for example: a = '1', then a could be marked as a constant)
// This is for logical constant (for example: a = '1', then a could be marked as a constant)
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
eq_properties =
eq_properties.with_constants(Self::extend_constants(input, predicate));
Expand Down Expand Up @@ -331,7 +331,7 @@ impl ExecutionPlan for FilterExec {
}

fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
// Tell optimizer this operator doesn't reorder its input
vec![true]
}

Expand Down Expand Up @@ -425,7 +425,7 @@ struct FilterExecStream {
predicate: Arc<dyn PhysicalExpr>,
/// The input partition to filter.
input: SendableRecordBatchStream,
/// runtime metrics recording
/// Runtime metrics recording
baseline_metrics: BaselineMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
Expand All @@ -449,7 +449,7 @@ fn filter_and_project(
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match (as_boolean_array(&array), projection) {
// apply filter array to record batch
// Apply filter array to record batch
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
Expand Down Expand Up @@ -490,7 +490,7 @@ impl Stream for FilterExecStream {
&self.schema,
)?;
timer.done();
// skip entirely filtered batches
// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
Expand All @@ -507,7 +507,7 @@ impl Stream for FilterExecStream {
}

fn size_hint(&self) -> (usize, Option<usize>) {
// same number of record batches
// Same number of record batches
self.input.size_hint()
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fn make_count_batch(count: u64) -> RecordBatch {
}

fn make_count_schema() -> SchemaRef {
// define a schema.
// Define a schema.
Arc::new(Schema::new(vec![Field::new(
"count",
DataType::UInt64,
Expand Down
34 changes: 17 additions & 17 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl LimitStream {
if batch.num_rows() > 0 {
break poll;
} else {
// continue to poll input stream
// Continue to poll input stream
}
}
Poll::Ready(Some(Err(_e))) => break poll,
Expand All @@ -408,12 +408,12 @@ impl LimitStream {
}
}

/// fetches from the batch
/// Fetches from the batch
fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
if self.fetch == 0 {
self.input = None; // clear input so it can be dropped early
self.input = None; // Clear input so it can be dropped early
None
} else if batch.num_rows() < self.fetch {
//
Expand All @@ -422,7 +422,7 @@ impl LimitStream {
} else if batch.num_rows() >= self.fetch {
let batch_rows = self.fetch;
self.fetch = 0;
self.input = None; // clear input so it can be dropped early
self.input = None; // Clear input so it can be dropped early

// It is guaranteed that batch_rows is <= batch.num_rows
Some(batch.slice(0, batch_rows))
Expand Down Expand Up @@ -453,7 +453,7 @@ impl Stream for LimitStream {
other => other,
})
}
// input has been cleared
// Input has been cleared
None => Poll::Ready(None),
};

Expand Down Expand Up @@ -489,17 +489,17 @@ mod tests {
let num_partitions = 4;
let csv = test::scan_partitioned(num_partitions);

// input should have 4 partitions
// Input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7));

// the result should contain 4 batches (one per input partition)
// The result should contain 4 batches (one per input partition)
let iter = limit.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// there should be a total of 100 rows
// There should be a total of 100 rows
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 7);

Expand All @@ -520,7 +520,7 @@ mod tests {
let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// Limit of six needs to consume the entire first record batch
// (5 rows) and 1 row from the second (1 row)
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
Expand Down Expand Up @@ -550,7 +550,7 @@ mod tests {
let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// Limit of six needs to consume the entire first record batch
// (6 rows) and stop immediately
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
Expand Down Expand Up @@ -580,7 +580,7 @@ mod tests {
let index = input.index();
assert_eq!(index.value(), 0);

// limit of six needs to consume the entire first record batch
// Limit of six needs to consume the entire first record batch
// (6 rows) and stop immediately
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream =
Expand All @@ -598,7 +598,7 @@ mod tests {
Ok(())
}

// test cases for "skip"
// Test cases for "skip"
async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
let task_ctx = Arc::new(TaskContext::default());

Expand All @@ -611,7 +611,7 @@ mod tests {
let offset =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);

// the result should contain 4 batches (one per input partition)
// The result should contain 4 batches (one per input partition)
let iter = offset.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
Ok(batches.iter().map(|batch| batch.num_rows()).sum())
Expand All @@ -633,15 +633,15 @@ mod tests {

#[tokio::test]
async fn skip_3_fetch_none() -> Result<()> {
// there are total of 400 rows, we skipped 3 rows (offset = 3)
// There are total of 400 rows, we skipped 3 rows (offset = 3)
let row_count = skip_and_fetch(3, None).await?;
assert_eq!(row_count, 397);
Ok(())
}

#[tokio::test]
async fn skip_3_fetch_10_stats() -> Result<()> {
// there are total of 100 rows, we skipped 3 rows (offset = 3)
// There are total of 100 rows, we skipped 3 rows (offset = 3)
let row_count = skip_and_fetch(3, Some(10)).await?;
assert_eq!(row_count, 10);
Ok(())
Expand All @@ -656,15 +656,15 @@ mod tests {

#[tokio::test]
async fn skip_400_fetch_1() -> Result<()> {
// there are a total of 400 rows
// There are a total of 400 rows
let row_count = skip_and_fetch(400, Some(1)).await?;
assert_eq!(row_count, 0);
Ok(())
}

#[tokio::test]
async fn skip_401_fetch_none() -> Result<()> {
// there are total of 400 rows, we skipped 401 rows (offset = 3)
// There are total of 400 rows, we skipped 401 rows (offset = 3)
let row_count = skip_and_fetch(401, None).await?;
assert_eq!(row_count, 0);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl ExecutionPlan for MemoryExec {
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
// This is a leaf node and has no children
vec![]
}

Expand Down Expand Up @@ -179,7 +179,7 @@ impl MemoryExec {
})
}

/// set `show_sizes` to determine whether to display partition sizes
/// Set `show_sizes` to determine whether to display partition sizes
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
self
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/placeholder_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod tests {
let schema = test::aggr_test_schema();
let placeholder = PlaceholderRowExec::new(schema);

// ask for the wrong partition
// Ask for the wrong partition
assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
assert!(placeholder.execute(20, task_ctx).is_err());
Ok(())
Expand All @@ -223,7 +223,7 @@ mod tests {
let iter = placeholder.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
// Should have one item
assert_eq!(batches.len(), 1);

Ok(())
Expand All @@ -240,7 +240,7 @@ mod tests {
let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
let batches = common::collect(iter).await?;

// should have one item
// Should have one item
assert_eq!(batches.len(), 1);
}

Expand Down
Loading

0 comments on commit d3920f3

Please sign in to comment.