From 701e0ddc3b3ff04544b9679f189eb42ac982a8a4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 8 Feb 2024 20:27:40 -0500 Subject: [PATCH] Minor: Improve `DataFrame` docs, add examples (#9159) * Minor: Improve DataFrame docs, add examples * More better docs * more * Fix docs * Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --------- Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- datafusion/core/src/dataframe/mod.rs | 226 ++++++++++++++----- datafusion/core/src/dataframe/parquet.rs | 22 +- datafusion/core/src/execution/context/mod.rs | 4 +- datafusion/physical-plan/src/lib.rs | 4 +- 4 files changed, 201 insertions(+), 55 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index f4023642ef04..2397d011d4aa 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -109,27 +109,54 @@ impl Default for DataFrameWriteOptions { } } -/// DataFrame represents a logical set of rows with the same named columns. -/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or -/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html) +/// Represents a logical set of rows with the same named columns. /// -/// DataFrames are typically created by the `read_csv` and `read_parquet` methods on the -/// [SessionContext](../execution/context/struct.SessionContext.html) and can then be modified -/// by calling the transformation methods, such as `filter`, `select`, `aggregate`, and `limit` -/// to build up a query definition. +/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame +/// represents a 2 dimensional table of rows and columns. /// -/// The query can be executed by calling the `collect` method. +/// The typical workflow using DataFrames looks like /// +/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`] +/// and [`read_parquet`]. +/// +/// 2. Build a desired calculation by calling methods such as [`filter`], +/// [`select`], [`aggregate`], and [`limit`] +/// +/// 3. Execute into [`RecordBatch`]es by calling [`collect`] +/// +/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`] +/// required for execution. +/// +/// DataFrames are "lazy" in the sense that most methods do not actually compute +/// anything, they just build up a plan. Calling [`collect`] executes the plan +/// using the same DataFusion planning and execution process used to execute SQL +/// and other queries. +/// +/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html +/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html +/// [`read_csv`]: SessionContext::read_csv +/// [`read_parquet`]: SessionContext::read_parquet +/// [`filter`]: DataFrame::filter +/// [`select`]: DataFrame::select +/// [`aggregate`]: DataFrame::aggregate +/// [`limit`]: DataFrame::limit +/// [`collect`]: DataFrame::collect +/// +/// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); +/// // Read the data from a csv file /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; +/// // create a new dataframe that computes the equivalent of +/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;` /// let df = df.filter(col("a").lt_eq(col("b")))? /// .aggregate(vec![col("a")], vec![min(col("b"))])? /// .limit(0, Some(100))?; +/// // Perform the actual computation /// let results = df.collect(); /// # Ok(()) /// # } @@ -141,7 +168,11 @@ pub struct DataFrame { } impl DataFrame { - /// Create a new Table based on an existing logical plan + /// Create a new `DataFrame ` based on an existing `LogicalPlan` + /// + /// This is a low-level method and is not typically used by end users. See + /// [`SessionContext::read_csv`] and other methods for creating a + /// `DataFrame` from an existing datasource. pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self { Self { session_state, @@ -149,7 +180,7 @@ impl DataFrame { } } - /// Create a physical plan + /// Consume the DataFrame and produce a physical plan pub async fn create_physical_plan(self) -> Result> { self.session_state.create_physical_plan(&self.plan).await } @@ -180,8 +211,12 @@ impl DataFrame { self.select(expr) } - /// Create a projection based on arbitrary expressions. + /// Project arbitrary expressions (like SQL SELECT expressions) into a new + /// `DataFrame`. + /// + /// The output `DataFrame` has one column for each element in `expr_list`. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -207,11 +242,12 @@ impl DataFrame { /// Expand each list element of a column to multiple rows. /// - /// Seee also: + /// See also: /// /// 1. [`UnnestOptions`] documentation for the behavior of `unnest` /// 2. [`Self::unnest_column_with_options`] /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -243,8 +279,13 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Filter a DataFrame to only include rows that match the specified filter expression. + /// Return a DataFrame with only rows for which `predicate` evaluates to + /// `true`. /// + /// Rows for which `predicate` evaluates to `false` or `null` + /// are filtered out. + /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -263,8 +304,10 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Perform an aggregate query with optional grouping expressions. + /// Return a new `DataFrame` that aggregates the rows of the current + /// `DataFrame`, first optionally grouping by the given expressions. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -292,7 +335,8 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema + /// Return a new DataFrame that adds the result of evaluating one or more + /// window functions ([`Expr::WindowFunction`]) to the existing columns pub fn window(self, window_exprs: Vec) -> Result { let plan = LogicalPlanBuilder::from(self.plan) .window(window_exprs)? @@ -300,11 +344,13 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Limit the number of rows returned from this DataFrame. + /// Returns a new `DataFrame` with a limited number of rows. /// + /// # Arguments /// `skip` - Number of rows to skip before fetch any row + /// `fetch` - Maximum number of rows to return, after skipping `skip` rows. /// - /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows. + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -323,9 +369,11 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.The - /// two [`DataFrame`]s must have exactly the same schema + /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows. + /// + /// The two [`DataFrame`]s must have exactly the same schema /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -345,9 +393,12 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Calculate the distinct union of two [`DataFrame`]s. The - /// two [`DataFrame`]s must have exactly the same schema + /// Calculate the distinct union of two [`DataFrame`]s. + /// + /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate + /// rows are discarded. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -369,8 +420,9 @@ impl DataFrame { )) } - /// Filter out duplicate rows + /// Return a new `DataFrame` with all duplicated rows removed. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -389,9 +441,12 @@ impl DataFrame { )) } - /// Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and - /// returns nulls for non numeric datatypes. Try in keep output similar to pandas + /// Return a new `DataFrame` that has statistics for a DataFrame. /// + /// Only summarizes numeric datatypes at the moment and returns nulls for + /// non numeric datatypes. The output format is modeled after pandas + /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -562,8 +617,12 @@ impl DataFrame { )) } - /// Sort the DataFrame by the specified sorting expressions. Any expression can be turned into - /// a sort expression by calling its [sort](../logical_plan/enum.Expr.html#method.sort) method. + /// Sort the DataFrame by the specified sorting expressions. + /// + /// Note that any expression can be turned into + /// a sort expression by calling its [sort](Expr::sort) method. + /// + /// # Example /// /// ``` /// # use datafusion::prelude::*; @@ -572,7 +631,10 @@ impl DataFrame { /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; - /// let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; + /// let df = df.sort(vec![ + /// col("a").sort(true, true), // a ASC, nulls first + /// col("b").sort(false, false), // b DESC, nulls last + /// ])?; /// # Ok(()) /// # } /// ``` @@ -642,7 +704,6 @@ impl DataFrame { /// identifying and optimizing equality predicates. /// /// # Example - /// /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -688,6 +749,7 @@ impl DataFrame { /// Repartition a DataFrame based on a logical partitioning scheme. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -706,9 +768,12 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, plan)) } - /// Run a count aggregate on the DataFrame and execute the DataFrame to collect this - /// count and return it as a usize, to find the total number of rows after executing - /// the DataFrame. + /// Return the total number of rows in this `DataFrame`. + /// + /// Note that this method will actually run a plan to calculate the count, + /// which may be slow for large or complicated DataFrames. + /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -739,9 +804,14 @@ impl DataFrame { Ok(len) } - /// Convert the logical plan represented by this DataFrame into a physical plan and - /// execute it, collecting all resulting batches into memory - /// Executes this DataFrame and collects all results into a vector of RecordBatch. + /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es into memory. + /// + /// Prior to calling `collect`, modifying a DataFrame simply updates a plan + /// (no actual computation is performed). `collect` triggers the computation. + /// + /// See [`Self::execute_stream`] to execute a DataFrame without buffering. + /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -759,8 +829,9 @@ impl DataFrame { collect(plan, task_ctx).await } - /// Print results. + /// Execute the `DataFrame` and print the results to the console. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -777,8 +848,10 @@ impl DataFrame { Ok(pretty::print_batches(&results)?) } - /// Print results and limit rows. + /// Execute the `DataFrame` and print only the first `num` rows of the + /// result to the console. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -795,13 +868,15 @@ impl DataFrame { Ok(pretty::print_batches(&results)?) } - /// Get a new TaskContext to run in this session + /// Return a new [`TaskContext`] which would be used to execute this DataFrame pub fn task_ctx(&self) -> TaskContext { TaskContext::from(&self.session_state) } /// Executes this DataFrame and returns a stream over a single partition /// + /// See [Self::collect] to buffer the `RecordBatch`es in memory. + /// /// # Example /// ``` /// # use datafusion::prelude::*; @@ -828,6 +903,7 @@ impl DataFrame { /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch /// maintaining the input partitioning. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -871,9 +947,12 @@ impl DataFrame { execute_stream_partitioned(plan, task_ctx) } - /// Returns the schema describing the output of this DataFrame in terms of columns returned, - /// where each column has a name, data type, and nullability attribute. - + /// Returns the `DFSchema` describing the output of this DataFrame. + /// + /// The output `DFSchema` contains information on the name, data type, and + /// nullability for each column. + /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -889,7 +968,8 @@ impl DataFrame { self.plan.schema() } - /// Return the unoptimized logical plan + /// Return a reference to the unoptimized [`LogicalPlan`] that comprises + /// this DataFrame. See [`Self::into_unoptimized_plan`] for more details. pub fn logical_plan(&self) -> &LogicalPlan { &self.plan } @@ -899,20 +979,21 @@ impl DataFrame { (self.session_state, self.plan) } - /// Return the logical plan represented by this DataFrame without running the optimizers + /// Return the [`LogicalPlan`] represented by this DataFrame without running + /// any optimizers /// - /// Note: This method should not be used outside testing, as it loses the snapshot - /// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent - /// operations may take place against a different state + /// Note: This method should not be used outside testing, as it loses the + /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and + /// consequently subsequent operations may take place against a different + /// state (e.g. a different value of `now()`) pub fn into_unoptimized_plan(self) -> LogicalPlan { self.plan } - /// Return the optimized logical plan represented by this DataFrame. + /// Return the optimized [`LogicalPlan`] represented by this DataFrame. /// - /// Note: This method should not be used outside testing, as it loses the snapshot - /// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent - /// operations may take place against a different state + /// Note: This method should not be used outside testing -- see + /// [`Self::into_optimized_plan`] for more details. pub fn into_optimized_plan(self) -> Result { // Optimize the plan first for better UX self.session_state.optimize(&self.plan) @@ -961,6 +1042,7 @@ impl DataFrame { /// Return a `FunctionRegistry` used to plan udf's calls /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -1025,6 +1107,7 @@ impl DataFrame { } /// Write this DataFrame to the referenced table by name. + /// /// This method uses on the same underlying implementation /// as the SQL Insert Into statement. Unlike most other DataFrame methods, /// this method executes eagerly. Data is written to the table using an @@ -1050,7 +1133,27 @@ impl DataFrame { DataFrame::new(self.session_state, plan).collect().await } - /// Write a `DataFrame` to a CSV file. + /// Execute the `DataFrame` and write the results to CSV file(s). + /// + /// # Example + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// use datafusion::dataframe::DataFrameWriteOptions; + /// let ctx = SessionContext::new(); + /// // Sort the data by column "b" and write it to a new location + /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? + /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first + /// .write_csv( + /// "output.csv", + /// DataFrameWriteOptions::new(), + /// None, // can also specify CSV writing options here + /// ).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn write_csv( self, path: &str, @@ -1081,7 +1184,26 @@ impl DataFrame { DataFrame::new(self.session_state, plan).collect().await } - /// Executes a query and writes the results to a partitioned JSON file. + /// Execute the `DataFrame` and write the results to JSON file(s). + /// + /// # Example + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// use datafusion::dataframe::DataFrameWriteOptions; + /// let ctx = SessionContext::new(); + /// // Sort the data by column "b" and write it to a new location + /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? + /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first + /// .write_json( + /// "output.json", + /// DataFrameWriteOptions::new(), + /// ).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn write_json( self, path: &str, @@ -1107,6 +1229,7 @@ impl DataFrame { /// Add an additional column to the DataFrame. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -1159,6 +1282,7 @@ impl DataFrame { /// Alternatively setting Datafusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable /// case sensitive rename without need to wrap column name into special symbols /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 7c0ec1edc2c0..00a0e780d51f 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -26,7 +26,27 @@ use super::{ }; impl DataFrame { - /// Write a `DataFrame` to a Parquet file. + /// Execute the `DataFrame` and write the results to Parquet file(s). + /// + /// # Example + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// use datafusion::dataframe::DataFrameWriteOptions; + /// let ctx = SessionContext::new(); + /// // Sort the data by column "b" and write it to a new location + /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? + /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first + /// .write_parquet( + /// "output.parquet", + /// DataFrameWriteOptions::new(), + /// None, // can also specify parquet writing options here + /// ).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn write_parquet( self, path: &str, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b9039c5c9273..0bc75720e7c8 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -164,7 +164,7 @@ where /// /// [`SessionContext`] provides the following functionality: /// -/// * Create a DataFrame from a CSV or Parquet data source. +/// * Create a [`DataFrame`] from a CSV or Parquet data source. /// * Register a CSV or Parquet data source as a table that can be referenced from a SQL query. /// * Register a custom data source that can be referenced from a SQL query. /// * Execution a SQL query @@ -172,7 +172,7 @@ where /// # Example: DataFrame API /// /// The following example demonstrates how to use the context to execute a query against a CSV -/// data source using the DataFrame API: +/// data source using the [`DataFrame`] API: /// /// ``` /// use datafusion::prelude::*; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 9c8ad03bbea0..1c4a6ac0ecaf 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -511,7 +511,9 @@ pub async fn collect( common::collect(stream).await } -/// Execute the [ExecutionPlan] and return a single stream of results. +/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es. +/// +/// See [collect] to buffer the `RecordBatch`es in memory. /// /// # Aborting Execution ///