Skip to content

Commit

Permalink
Fix clippy lints (#1885)
Browse files Browse the repository at this point in the history
* fix some lints

Signed-off-by: remzi <[email protected]>

* fix some lints

Signed-off-by: remzi <[email protected]>

* fix some lints

Signed-off-by: remzi <[email protected]>

* fix some lints

Signed-off-by: remzi <[email protected]>

* fix all lints

Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 authored Feb 25, 2022
1 parent 9e75ff5 commit 879b754
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 31 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
DataType::Timestamp(time_unit, timezone) => {
ArrowTypeEnum::Timestamp(protobuf::Timestamp {
time_unit: protobuf::TimeUnit::from_arrow_time_unit(time_unit) as i32,
timezone: timezone.to_owned().unwrap_or_else(String::new),
timezone: timezone.to_owned().unwrap_or_default(),
})
}
DataType::Date32 => ArrowTypeEnum::Date32(EmptyMessage {}),
Expand Down
9 changes: 3 additions & 6 deletions datafusion/fuzz-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use env_logger;
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
.map(|batch| {
.flat_map(|batch| {
assert_eq!(batch.num_columns(), 1);
batch
.column(0)
Expand All @@ -35,16 +35,14 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
.unwrap()
.iter()
})
.flatten()
.collect()
}

/// extract values from batches and sort them
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
.map(|batches| batches_to_vec(batches).into_iter())
.flatten()
.flat_map(|batches| batches_to_vec(batches).into_iter())
.collect();

values.sort_unstable();
Expand All @@ -60,14 +58,13 @@ pub fn add_empty_batches(

batches
.into_iter()
.map(|batch| {
.flat_map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat(empty_batch.clone())
.take(rng.gen_range(0..2))
.chain(std::iter::once(batch))
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
})
.flatten()
.collect()
}
6 changes: 3 additions & 3 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ impl ExecutionProps {
var_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
let mut var_providers = self.var_providers.take().unwrap_or_else(HashMap::new);
let mut var_providers = self.var_providers.take().unwrap_or_default();

let old_provider = var_providers.insert(var_type, provider);

Expand Down Expand Up @@ -3262,7 +3262,7 @@ mod tests {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
ctx.write_csv(physical_plan, out_dir.to_string()).await
ctx.write_csv(physical_plan, out_dir).await
}

/// Execute SQL and write results to partitioned parquet files
Expand All @@ -3275,7 +3275,7 @@ mod tests {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
ctx.write_parquet(physical_plan, out_dir.to_string(), writer_properties)
ctx.write_parquet(physical_plan, out_dir, writer_properties)
.await
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,14 +1041,13 @@ pub(crate) fn expand_wildcard(
let columns_to_skip = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one column in projection
.map(|cols| {
.flat_map(|cols| {
let mut cols = cols.into_iter().collect::<Vec<_>>();
// sort join columns to make sure we consistently keep the same
// qualified column
cols.sort();
cols.into_iter().skip(1)
})
.flatten()
.collect::<HashSet<_>>();

if columns_to_skip.is_empty() {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,7 @@ impl LogicalPlan {
{
self.using_columns.push(
on.iter()
.map(|entry| [&entry.0, &entry.1])
.flatten()
.flat_map(|entry| [&entry.0, &entry.1])
.cloned()
.collect::<HashSet<Column>>(),
);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,13 @@ fn get_pushable_join_predicates<'a>(
let schema_columns = schema
.fields()
.iter()
.map(|f| {
.flat_map(|f| {
[
f.qualified_column(),
// we need to push down filter using unqualified column as well
f.unqualified_column(),
]
})
.flatten()
.collect::<HashSet<_>>();

state
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,12 +1176,12 @@ mod tests {
_partition: usize,
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
let stream;
if self.yield_first {
stream = TestYieldingStream::New;
let stream = if self.yield_first {
TestYieldingStream::New
} else {
stream = TestYieldingStream::Yielded;
}
TestYieldingStream::Yielded
};

Ok(Box::pin(stream))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/regex_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub fn regexp_replace<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<Arr
let (pattern, replace_all) = if flags == "g" {
(pattern.to_string(), true)
} else if flags.contains('g') {
(format!("(?{}){}", flags.to_string().replace("g", ""), pattern), true)
(format!("(?{}){}", flags.to_string().replace('g', ""), pattern), true)
} else {
(format!("(?{}){}", flags, pattern), false)
};
Expand Down
6 changes: 2 additions & 4 deletions datafusion/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,12 @@ fn col_stats_union(
.min_value
.zip(right.min_value)
.map(|(a, b)| expressions::helpers::min(&a, &b))
.map(Result::ok)
.flatten();
.and_then(Result::ok);
left.max_value = left
.max_value
.zip(right.max_value)
.map(|(a, b)| expressions::helpers::max(&a, &b))
.map(Result::ok)
.flatten();
.and_then(Result::ok);
left.null_count = left.null_count.zip(right.null_count).map(|(a, b)| a + b);

left
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
async fn values_empty_case() -> Result<()> {
let schema = test_util::aggr_test_schema();
let empty = ValuesExec::try_new(schema, vec![]);
assert!(!empty.is_ok());
assert!(empty.is_err());
Ok(())
}
}
3 changes: 1 addition & 2 deletions datafusion/tests/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ async fn run_merge_test(input: Vec<Vec<RecordBatch>>) {
for batch_size in batch_sizes {
let first_batch = input
.iter()
.map(|p| p.iter())
.flatten()
.flat_map(|p| p.iter())
.next()
.expect("at least one batch");
let schema = first_batch.schema();
Expand Down
3 changes: 1 addition & 2 deletions datafusion/tests/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
let input = vec![make_staggered_batches(size)];
let first_batch = input
.iter()
.map(|p| p.iter())
.flatten()
.flat_map(|p| p.iter())
.next()
.expect("at least one batch");
let schema = first_batch.schema();
Expand Down

0 comments on commit 879b754

Please sign in to comment.