Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sketch for aggregation intermediate results blocked management #11943

Closed

Conversation

Rachelint
Copy link
Contributor

@Rachelint Rachelint commented Aug 12, 2024

Which issue does this PR close?

Part of #11931 , part of #7065

Rationale for this change

Detail can see #11931

Are these changes tested?

By exist tests.

Are there any user-facing changes?

Two functions are added to GroupValues and GroupAccumulator trait.

    /// Returns `true` if this group values supports blocked mode.
    fn supports_blocked_mode(&self) -> bool;

    /// Switch the group values/accumulators to flat or blocked mode.
    ///
    /// After switching mode, all data in previous mode will be cleared.
    fn switch_to_mode(&mut self, mode: GroupStatesMode) -> Result<()>;

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions functions labels Aug 12, 2024
@Rachelint Rachelint force-pushed the sketch-blocked-aggr-state-management branch 2 times, most recently from fdb1789 to f087efe Compare August 12, 2024 19:27
@Rachelint Rachelint marked this pull request as ready for review August 12, 2024 19:59
@alamb
Copy link
Contributor

alamb commented Aug 12, 2024

Thank you @Rachelint -- I hope to look at this more carefully tomorrow

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 13, 2024

The benchmark after impl blocked version GroupValuesRows and, blocked version count, sum, avg accumulators.
It make about 20% queries in clickbench 1.10~1.24x faster.

And after we impl blocked version for all group values and accumulators, more queries can be faster!

Comparing main and sketch-blocked-aggr-state-management
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ sketch-blocked-aggr-state-management ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.72ms │                               0.69ms │     no change │
│ QQuery 1     │    67.71ms │                              69.75ms │     no change │
│ QQuery 2     │   169.92ms │                             170.43ms │     no change │
│ QQuery 3     │   185.76ms │                             186.77ms │     no change │
│ QQuery 4     │  1623.50ms │                            1649.09ms │     no change │
│ QQuery 5     │  1603.76ms │                            1622.42ms │     no change │
│ QQuery 6     │    61.56ms │                              62.45ms │     no change │
│ QQuery 7     │    67.97ms │                              70.10ms │     no change │
│ QQuery 8     │  2332.37ms │                            2407.51ms │     no change │
│ QQuery 9     │  1952.18ms │                            2010.02ms │     no change │
│ QQuery 10    │   551.12ms │                             549.55ms │     no change │
│ QQuery 11    │   606.73ms │                             634.02ms │     no change │
│ QQuery 12    │  1822.70ms │                            1779.43ms │     no change │
│ QQuery 13    │  3414.56ms │                            3445.55ms │     no change │
│ QQuery 14    │  2595.05ms │                            2309.24ms │ +1.12x faster │
│ QQuery 15    │  1836.69ms │                            1845.98ms │     no change │
│ QQuery 16    │  5056.23ms │                            4432.96ms │ +1.14x faster │
│ QQuery 17    │  4903.18ms │                            4182.80ms │ +1.17x faster │
│ QQuery 18    │ 10383.16ms │                            8529.29ms │ +1.22x faster │
│ QQuery 19    │   152.06ms │                             152.31ms │     no change │
│ QQuery 20    │  3364.93ms │                            3370.44ms │     no change │
│ QQuery 21    │  3972.92ms │                            3974.43ms │     no change │
│ QQuery 22    │  9594.65ms │                            9596.55ms │     no change │
│ QQuery 23    │ 23807.10ms │                           23718.25ms │     no change │
│ QQuery 24    │  1163.30ms │                            1187.58ms │     no change │
│ QQuery 25    │  1055.71ms │                            1061.81ms │     no change │
│ QQuery 26    │  1366.61ms │                            1363.28ms │     no change │
│ QQuery 27    │  4821.72ms │                            4843.56ms │     no change │
│ QQuery 28    │ 23710.55ms │                           22313.88ms │ +1.06x faster │
│ QQuery 29    │   910.31ms │                             932.51ms │     no change │
│ QQuery 30    │  2092.15ms │                            2028.27ms │     no change │
│ QQuery 31    │  2328.77ms │                            2189.39ms │ +1.06x faster │
│ QQuery 32    │  8687.27ms │                            7065.45ms │ +1.23x faster │
│ QQuery 33    │  9701.59ms │                            9577.62ms │     no change │
│ QQuery 34    │  9628.66ms │                            9658.56ms │     no change │
│ QQuery 35    │  3116.66ms │                            2809.27ms │ +1.11x faster │
│ QQuery 36    │   261.19ms │                             266.92ms │     no change │
│ QQuery 37    │   173.76ms │                             171.29ms │     no change │
│ QQuery 38    │   162.87ms │                             161.34ms │     no change │
│ QQuery 39    │   855.42ms │                             690.67ms │ +1.24x faster │
│ QQuery 40    │    62.04ms │                              62.04ms │     no change │
│ QQuery 41    │    57.62ms │                              54.97ms │     no change │
│ QQuery 42    │    72.14ms │                              70.00ms │     no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main)                                   │ 150354.87ms │
│ Total Time (sketch-blocked-aggr-state-management)   │ 143278.44ms │
│ Average Time (main)                                 │   3496.62ms │
│ Average Time (sketch-blocked-aggr-state-management) │   3332.06ms │
│ Queries Faster                                      │           9 │
│ Queries Slower                                      │           0 │
│ Queries with No Change                              │          34 │
└─────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ sketch-blocked-aggr-state-management ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     2.03ms │                               2.10ms │     no change │
│ QQuery 1     │    57.64ms │                              57.78ms │     no change │
│ QQuery 2     │   154.25ms │                             163.80ms │  1.06x slower │
│ QQuery 3     │   167.63ms │                             173.56ms │     no change │
│ QQuery 4     │  1672.39ms │                            1649.33ms │     no change │
│ QQuery 5     │  1510.56ms │                            1527.82ms │     no change │
│ QQuery 6     │    50.93ms │                              48.64ms │     no change │
│ QQuery 7     │    58.24ms │                              59.35ms │     no change │
│ QQuery 8     │  2390.67ms │                            2432.79ms │     no change │
│ QQuery 9     │  1931.85ms │                            1994.58ms │     no change │
│ QQuery 10    │   533.01ms │                             532.21ms │     no change │
│ QQuery 11    │   586.52ms │                             589.53ms │     no change │
│ QQuery 12    │  1702.88ms │                            1716.73ms │     no change │
│ QQuery 13    │  3282.15ms │                            3287.57ms │     no change │
│ QQuery 14    │  2462.18ms │                            2199.73ms │ +1.12x faster │
│ QQuery 15    │  1866.22ms │                            1876.52ms │     no change │
│ QQuery 16    │  4925.49ms │                            4330.47ms │ +1.14x faster │
│ QQuery 17    │  4805.47ms │                            4062.59ms │ +1.18x faster │
│ QQuery 18    │  9854.37ms │                            8161.64ms │ +1.21x faster │
│ QQuery 19    │   141.73ms │                             139.98ms │     no change │
│ QQuery 20    │  3663.83ms │                            3654.61ms │     no change │
│ QQuery 21    │  4188.63ms │                            4192.88ms │     no change │
│ QQuery 22    │  9622.93ms │                            9588.40ms │     no change │
│ QQuery 23    │ 21982.07ms │                           22106.21ms │     no change │
│ QQuery 24    │  1063.25ms │                            1063.79ms │     no change │
│ QQuery 25    │   853.54ms │                             858.32ms │     no change │
│ QQuery 26    │  1238.92ms │                            1249.95ms │     no change │
│ QQuery 27    │  5148.86ms │                            5144.44ms │     no change │
│ QQuery 28    │ 22077.36ms │                           21721.22ms │     no change │
│ QQuery 29    │   835.61ms │                             844.81ms │     no change │
│ QQuery 30    │  2009.91ms │                            1917.08ms │     no change │
│ QQuery 31    │  2223.99ms │                            2074.39ms │ +1.07x faster │
│ QQuery 32    │  8526.46ms │                            7018.38ms │ +1.21x faster │
│ QQuery 33    │  9774.11ms │                            9725.42ms │     no change │
│ QQuery 34    │  9496.05ms │                            9650.09ms │     no change │
│ QQuery 35    │  3134.31ms │                            2859.13ms │ +1.10x faster │
│ QQuery 36    │   240.50ms │                             251.96ms │     no change │
│ QQuery 37    │   111.59ms │                             111.42ms │     no change │
│ QQuery 38    │   137.54ms │                             140.59ms │     no change │
│ QQuery 39    │   807.76ms │                             664.45ms │ +1.22x faster │
│ QQuery 40    │    52.18ms │                              52.43ms │     no change │
│ QQuery 41    │    46.33ms │                              47.65ms │     no change │
│ QQuery 42    │    61.40ms │                              62.31ms │     no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main)                                   │ 145453.36ms │
│ Total Time (sketch-blocked-aggr-state-management)   │ 140006.65ms │
│ Average Time (main)                                 │   3382.64ms │
│ Average Time (sketch-blocked-aggr-state-management) │   3255.97ms │
│ Queries Faster                                      │           8 │
│ Queries Slower                                      │           1 │
│ Queries with No Change                              │          34 │
└─────────────────────────────────────────────────────┴─────────────┘
Note: Skipping /home/db/datafusion/benchmarks/results/main/clickbench_partitioned.json.bak as /home/db/datafusion/benchmarks/results/sketch-blocked-aggr-state-management/clickbench_partitioned.json.bak does not exist
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ sketch-blocked-aggr-state-management ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 208.92ms │                             214.15ms │     no change │
│ QQuery 2     │  31.25ms │                              31.41ms │     no change │
│ QQuery 3     │  83.52ms │                              83.58ms │     no change │
│ QQuery 4     │  57.93ms │                              60.68ms │     no change │
│ QQuery 5     │ 122.64ms │                             121.96ms │     no change │
│ QQuery 6     │  12.89ms │                              12.74ms │     no change │
│ QQuery 7     │ 250.84ms │                             250.33ms │     no change │
│ QQuery 8     │  26.35ms │                              25.97ms │     no change │
│ QQuery 9     │ 118.93ms │                             119.39ms │     no change │
│ QQuery 10    │ 117.13ms │                             116.33ms │     no change │
│ QQuery 11    │  57.04ms │                              56.61ms │     no change │
│ QQuery 12    │  34.96ms │                              35.79ms │     no change │
│ QQuery 13    │  77.05ms │                              77.11ms │     no change │
│ QQuery 14    │  15.60ms │                              14.86ms │     no change │
│ QQuery 15    │  23.89ms │                              24.24ms │     no change │
│ QQuery 16    │  36.37ms │                              37.05ms │     no change │
│ QQuery 17    │ 173.75ms │                             179.35ms │     no change │
│ QQuery 18    │ 491.96ms │                             490.21ms │     no change │
│ QQuery 19    │  36.08ms │                              36.41ms │     no change │
│ QQuery 20    │  79.92ms │                              69.56ms │ +1.15x faster │
│ QQuery 21    │ 284.69ms │                             285.27ms │     no change │
│ QQuery 22    │  19.66ms │                              19.05ms │     no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                   │ 2361.37ms │
│ Total Time (sketch-blocked-aggr-state-management)   │ 2362.03ms │
│ Average Time (main)                                 │  107.34ms │
│ Average Time (sketch-blocked-aggr-state-management) │  107.37ms │
│ Queries Faster                                      │         1 │
│ Queries Slower                                      │         0 │
│ Queries with No Change                              │        21 │
└─────────────────────────────────────────────────────┴───────────┘
Note: Skipping /home/db/datafusion/benchmarks/results/main/tpch_sf10.json as /home/db/datafusion/benchmarks/results/sketch-blocked-aggr-state-management/tpch_sf10.json does not exist
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ sketch-blocked-aggr-state-management ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 291.46ms │                             292.07ms │     no change │
│ QQuery 2     │  44.78ms │                              46.78ms │     no change │
│ QQuery 3     │ 109.87ms │                             113.97ms │     no change │
│ QQuery 4     │  60.18ms │                              60.21ms │     no change │
│ QQuery 5     │ 196.33ms │                             192.04ms │     no change │
│ QQuery 6     │  57.52ms │                              55.79ms │     no change │
│ QQuery 7     │ 305.37ms │                             301.56ms │     no change │
│ QQuery 8     │ 124.96ms │                             125.75ms │     no change │
│ QQuery 9     │ 226.72ms │                             228.80ms │     no change │
│ QQuery 10    │ 196.97ms │                             193.90ms │     no change │
│ QQuery 11    │  32.67ms │                              31.29ms │     no change │
│ QQuery 12    │  85.23ms │                              83.33ms │     no change │
│ QQuery 13    │ 128.19ms │                             125.37ms │     no change │
│ QQuery 14    │  78.30ms │                              79.75ms │     no change │
│ QQuery 15    │ 107.56ms │                             108.60ms │     no change │
│ QQuery 16    │  42.01ms │                              43.48ms │     no change │
│ QQuery 17    │ 274.14ms │                             282.84ms │     no change │
│ QQuery 18    │ 453.89ms │                             451.47ms │     no change │
│ QQuery 19    │ 141.27ms │                             140.59ms │     no change │
│ QQuery 20    │ 134.95ms │                             123.06ms │ +1.10x faster │
│ QQuery 21    │ 295.07ms │                             297.03ms │     no change │
│ QQuery 22    │  25.33ms │                              25.97ms │     no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                   │ 3412.78ms │
│ Total Time (sketch-blocked-aggr-state-management)   │ 3403.63ms │
│ Average Time (main)                                 │  155.13ms │
│ Average Time (sketch-blocked-aggr-state-management) │  154.71ms │
│ Queries Faster                                      │         1 │
│ Queries Slower                                      │         0 │
│ Queries with No Change                              │        21 │
└─────────────────────────────────────────────────────┴───────────┘

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 13, 2024

Does this improve memory usage I forgot it is sketch

What is the difference between blocked approach and Emit::First with block size? At the end, there are only AllBlocks and FirstBlocks?

}
}

// emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is
// over the target memory size after emission, we can emit again rather than returning Err.
let _ = self.update_memory_reservation();
let batch = RecordBatch::try_new(schema, output)?;
Ok(batch)
let batches = outputs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let batches = outputs
outputs
.into_iter()
.map(|o| {
RecordBatch::try_new(Arc::clone(&schema), o)
.map_err(|e| DataFusionError::ArrowError(e, None))
})
.collect::<Result<VecDeque<_>>>()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such codes may be stale now.

.into_iter()
.map(|o| {
RecordBatch::try_new(Arc::clone(&schema), o)
.map_err(|e| DataFusionError::ArrowError(e, None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use macro for error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such codes may be stale now.

@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream {

/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,
current_group_indices: Vec<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason to use u64 instead of usize

Copy link
Contributor Author

@Rachelint Rachelint Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason to use u64 instead of usize

I think the clearer u64 is needed when we make the blocked impls,
we need to split the group_idx to two parts:

  • high 32bits used to represent the block id
  • low 32bits used to represent the block offset

So for reusing the same current_group_indices buffer in both flat and blocked mode, I modify all related group_idx to u64.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 13, 2024

@jayzhan211 Yes, still not any detailed blocked impls now, and just make benchmark for ensuring the sketch will not decrease the performance.

I guess the blocked apporach may be not related much to Emit::First. And their relationship may be that the blocked emission is just something can be used to impl the optimization mentioned in #9562 .

Actually the blocked approach's main traget is that it manages the data block by block in the GroupValues and GroupAccumulator(need the new blocked impl for reaching this), which can avoid the copying cost compared to the single vector approach similar as #7065 .

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 13, 2024

I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size).

We have emit_early_if_necessary that do First(n) emission when condition met.

    fn emit_early_if_necessary(&mut self) -> Result<()> {
        if self.group_values.len() >= self.batch_size
            && matches!(self.group_ordering, GroupOrdering::None)
            && matches!(self.mode, AggregateMode::Partial)
            && self.update_memory_reservation().is_err()
        {
            let n = self.group_values.len() / self.batch_size * self.batch_size;
            let batch = self.emit(EmitTo::First(n), false)?;
            self.exec_state = ExecutionState::ProducingOutput(batch);
        }
        Ok(())
    }

If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference?

Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ?

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 13, 2024

I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size).

We have emit_early_if_necessary that do First(n) emission when condition met.

    fn emit_early_if_necessary(&mut self) -> Result<()> {
        if self.group_values.len() >= self.batch_size
            && matches!(self.group_ordering, GroupOrdering::None)
            && matches!(self.mode, AggregateMode::Partial)
            && self.update_memory_reservation().is_err()
        {
            let n = self.group_values.len() / self.batch_size * self.batch_size;
            let batch = self.emit(EmitTo::First(n), false)?;
            self.exec_state = ExecutionState::ProducingOutput(batch);
        }
        Ok(())
    }

If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference?

Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ?

Ok, I think I got it now, if we constantly emit first n when the group len just equal to batch size, it is actually equal to blocked approach.

But emit first n will be just triggered in some special cases in my knowledge:

  • Streaming aggr (the really constantly emit first n case, and I forced to disable blocked mode in this case)
  • In Partial operator if found the memory exceeded limit (emit_early_if_necessary)

And in others, we need to poll to end, then emit all and use slice method to split it and return now:

  • For example, in the FinalPartitioned operator
  • In the Partial operator when memory under the memory limit
  • other cases...

And in such cases, blocked approach may be effecient for both memory and cpu as stated above?

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 13, 2024

Does this improve memory usage I forgot it is sketch

What is the difference between blocked approach and Emit::First with block size? At the end, there are only AllBlocks and FirstBlocks?

I think maybe we should keep them both, the blocked emission is just optimized version of Emit::All and Emit::First to call, when we found the blocked mode is enabled.

The flat mode impls should be kept, because some abilities are expensive to support in blocked mode (such as emit exact first n needed in streaming). But it may be easy to keep them both, because flat impl is just the special case of blocked impl which just always holds a single big block.

When will the blocked mode be enabled maybe can see in:
#11931 (comment)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THank you @Rachelint -- I took a look at this PR and here is some feedback:

  1. I think it is important to spend time actually showing this approach makes some queries faster (e.g. we should try and update one accumulator and one implementation of groups to show it makes a difference)

  2. I think it is important to actually chunk saving the intermediate state (e.g. in a Vec<...> rather than ... to realize the benefit of this chunked approach

  3. Thank you for working on this. Very cool

@@ -123,7 +151,7 @@ pub trait GroupsAccumulator: Send {
/// future use. The group_indices on subsequent calls to
/// `update_batch` or `merge_batch` will be shifted down by
/// `n`. See [`EmitTo::First`] for more details.
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
fn evaluate(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would help to document what expectations are on the Vec of array refs

Comment on lines 34 to 40
/// Emit all groups managed by blocks
AllBlocks,
/// Emit only the first `n` group blocks,
/// similar as `First`, but used in blocked `GroupValues` and `GroupAccumulator`.
///
/// For example, `n=3`, `block size=4`, finally 12 groups will be returned.
FirstBlocks(usize),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having two parallel emission modes for blocked output, I wonder if we could have some sort of "take" mode whose semantics did not shift the existing values down

For example, what if we introduced a notion of "block" across the group keys and and aggregators

pub enum EmitTo {
    /// Same
    All,
    /// Same
    First(usize),
    /// Takes the N'th block of rows from this accumulator
    /// it is an error to take the same batch twice or to emit `All` or `First` 
    /// after any TakeBatch(usize)
    TakeBlock(usize)
}

And then we would for example, make sure the group values and aggregators all saved data using blocks of 100K rows

Then to emit 1M rows, the accumulators would emit like

EmitTo::TakeBlock(0)
EmitTo::TakeBlock(1)
EmitTo::TakeBlock(2)
...
EmitTo::TakeBlock(9)

Copy link
Contributor Author

@Rachelint Rachelint Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be a great idea for reducing code changes(we dont need to refactor the returned emit value for T to Vec).

But with Emit::TakeBlock(idx), seems we need to record the current block id in outer, maybe a bit complicated?

we just define the Emit::CurrentBlock, and use the iterator approach to impl AllBlocks and FirstBlocks defined now?

If this makes sense, I can try switch the sketch to this way.

pub enum EmitTo {
    /// Same
    All,
    /// Same
    First(usize),
    /// Return the current block of rows from this accumulator
    /// We should always use this emit mode in blocked mode accumulator.
    CurrentBlock,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the iterator approach to impl AllBlocks and FirstBlocks defined now

Not sure how does this work, but it looks like a neat idea. If we apply the same idea to "element" (First and All), and consider it as a specialized case with block_size = 1, I think we could end up a pretty nice abstraction. Probably we just need EmitTo::Block(block_size) 🤔 However, it is too far way from now. 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the iterator approach to impl AllBlocks and FirstBlocks defined now

Not sure how does this work, but it looks like a neat idea. If we apply the same idea to "element" (First and All), and consider it as a specialized case with block_size = 1, I think we could end up a pretty nice abstraction. Probably we just need EmitTo::Block(block_size) 🤔 However, it is too far way from now. 😆

🤔 Yes, other emit mode can indeed seen as a case with specialized blocke size in the iterator approach. But considered about performance, it is better to let batch_size == block_size.

After introduce the iterator approach, just 200+ codes to finished the sketch, compared to the stale version sketch with 600+.
The main work is just to add a stream state ExecutionState::ProducingBlocks(blocks) .
https://github.com/Rachelint/arrow-datafusion/blob/d79d912d1677549c825cafc405911973ace0df46/datafusion/physical-plan/src/aggregates/row_hash.rs#L728

Maybe it can show how the blocked optimzation works.

@@ -68,11 +70,21 @@ where
fn update_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to realize the benefit of this blocked implementation I think you will need to change the state of the accumulators so that instead of a single large buffer

    /// values per group
    values: BooleanBufferBuilder,

The state is held in chunks like

    /// blocks of values per group
    values: Vec<BooleanBufferBuilder>

(or possibly this to support taking them out individually)

    /// blocks of values per group, None when taken
    values: Vec<Option<BooleanBufferBuilder>>

Copy link
Contributor Author

@Rachelint Rachelint Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree.

This sketch originally want to support some combinations like:

  • Single GroupValues + single GroupAccumulator
  • Blocked GroupValues + single GroupAccumulator
  • Blocked GroupValues + blocked GroupAccumulator

But after considering, it may just make the codes so complicated, and maybe can't have obvious improvement in Blocked GroupValues + single GroupAccumulator mode (constantly slice call still exist, or some even more expansive operations will be introduced if we impl it without slice).

@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream {

/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,
current_group_indices: Vec<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission

@2010YOUY01
Copy link
Contributor

I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size).
We have emit_early_if_necessary that do First(n) emission when condition met.

    fn emit_early_if_necessary(&mut self) -> Result<()> {
        if self.group_values.len() >= self.batch_size
            && matches!(self.group_ordering, GroupOrdering::None)
            && matches!(self.mode, AggregateMode::Partial)
            && self.update_memory_reservation().is_err()
        {
            let n = self.group_values.len() / self.batch_size * self.batch_size;
            let batch = self.emit(EmitTo::First(n), false)?;
            self.exec_state = ExecutionState::ProducingOutput(batch);
        }
        Ok(())
    }

If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference?
Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ?

Ok, I think I got it now, if we constantly emit first n when the group len just equal to batch size, it is actually equal to blocked approach.

But emit first n will be just triggered in some special cases in my knowledge:

  • Streaming aggr (the really constantly emit first n case, and I forced to disable blocked mode in this case)
  • In Partial operator if found the memory exceeded limit (emit_early_if_necessary)

And in others, we need to poll to end, then emit all and use slice method to split it and return now:

  • For example, in the FinalPartitioned operator
  • In the Partial operator when memory under the memory limit
  • other cases...

And in such cases, blocked approach may be effecient for both memory and cpu as stated above?

I still don't understand this FirstBlocks variant, if partial aggregate runs out of memory, output first N blocks and let final aggregation process them first looks like won't help much regarding total memory usage (since it's high cardinality aggregation)
Is it related to the spilling implementation? I will check it later.

Also thanks for pushing this forward, I think this approach is promising for performance

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 14, 2024

@2010YOUY01 make sense, it seems emit_early_if_necessary function is actually introduced in the spilling pr #7400.
I am checking the related codes about memory control, too.

But the FirstBlocks is just the blocked version First, because it is too expansive to emit exact first n groups in blocked impls.

For example:

  • block size = 4, blocks=2, and emit first 3 groups,
  • After emitting, groups in first block = 1, groups in second block = 4
  • And for keep the logic correct, we need to move groups to fill the first blocks to make:
    groups in first block = 4, groups in second block = 1

Such a groups movement will obviously lead to much cpu cost...

Actually we can remove the FirstBlocks and AllBlocks mode, and we impl the related logics just out of GroupValues and GroupAccumultor.

For example, in emit_early_if_necessary, we check if the GroupValues and GroupAccumultor are in blocked modes first. If so, we get the block_size, do the aligned work, and finally pass the aligned results in First(aligned) to them

But I think if we impl like this, it will be so confused, and for making it clear, I introduce the two new blocked emission mode FirstBlocks and AllBlocks.

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 14, 2024

THank you @Rachelint -- I took a look at this PR and here is some feedback:

1. I think it is important to spend time actually showing this approach makes some queries faster (e.g. we should try and update one accumulator and one implementation of groups to show it makes a difference)

2. I think it is important to actually chunk saving the intermediate state (e.g. in a `Vec<...>` rather than `...` to realize the benefit of this chunked approach

3. Thank you for working on this. Very cool

Thanks, I have finished a blocked style GroupValuesRows impl today, working on impl blocked style GroupAccumulator.

@Rachelint
Copy link
Contributor Author

Rachelint commented Aug 14, 2024

@2010YOUY01 After checking the codes about memory contorl, I think I got it.

  • emit_early_if_necessary is used in Partial
  • and spill_previous_if_necessary is used in the final phases

They all serve for the spilling. And the logic may be like this:

  • After reaching the memory limit, force the Partial to submit batches to Final as soon as possible
  • And the Final will spill them to disk for avoid oom
  • After all batches are submitted to Final, the Final merged the spilled batches and in-memory batches to get the final results (in streaming agg way, batches will be sorted before spilling).

@2010YOUY01
Copy link
Contributor

@2010YOUY01 After checking the codes about memory contorl, I think I got it.

  • emit_early_if_necessary is used in Partial
  • and spill_previous_if_necessary is used in the final phases

They all serve for the spilling. And the logic may be like this:

  • After reaching the memory limit, force the Partial to submit batches to Final as soon as possible
  • And the Final will spill them to disk for avoid oom
  • After all batches are submitted to Final, the Final merged the spilled batches and in-memory batches to get the final results (in streaming agg way, batches will be sorted before spilling).

Thanks, now I figured out the high-level idea of spilling in aggregation and how emit works in its implementation.

However there exists other code that does early emit in aggregation, and I'm still trying to figure out how they work, do you have any pointer for that? I'm guessing it's used in streaming aggregation or some pushed-down limits

if let Some(to_emit) = self.group_ordering.emit_to() {
let batch = extract_ok!(self.emit(to_emit, false));
self.exec_state = ExecutionState::ProducingOutput(batch);
timer.done();
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}

@Rachelint
Copy link
Contributor Author

@2010YOUY01 After checking the codes about memory contorl, I think I got it.

  • emit_early_if_necessary is used in Partial
  • and spill_previous_if_necessary is used in the final phases

They all serve for the spilling. And the logic may be like this:

  • After reaching the memory limit, force the Partial to submit batches to Final as soon as possible
  • And the Final will spill them to disk for avoid oom
  • After all batches are submitted to Final, the Final merged the spilled batches and in-memory batches to get the final results (in streaming agg way, batches will be sorted before spilling).

Thanks, now I figured out the high-level idea of spilling in aggregation and how emit works in its implementation.

However there exists other code that does early emit in aggregation, and I'm still trying to figure out how they work, do you have any pointer for that? I'm guessing it's used in streaming aggregation or some pushed-down limits

if let Some(to_emit) = self.group_ordering.emit_to() {
let batch = extract_ok!(self.emit(to_emit, false));
self.exec_state = ExecutionState::ProducingOutput(batch);
timer.done();
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}

Yes, you are right, there are two early emission cases, one is for spilling mentioned above, and another here is about streaming.

@Rachelint Rachelint force-pushed the sketch-blocked-aggr-state-management branch from 076d88e to ab92626 Compare August 14, 2024 16:25
@github-actions github-actions bot removed core Core DataFusion crate functions labels Aug 14, 2024
@Rachelint Rachelint marked this pull request as draft August 14, 2024 18:13
@Rachelint Rachelint marked this pull request as ready for review August 14, 2024 18:13
@Rachelint Rachelint marked this pull request as draft August 14, 2024 18:14
@Rachelint Rachelint force-pushed the sketch-blocked-aggr-state-management branch from 2a1be53 to 318c650 Compare September 1, 2024 12:12
@Rachelint
Copy link
Contributor Author

Rachelint commented Sep 1, 2024

Hi @alamb main comments #11943 (review) for this pr have been fixed, minding have a quick look? It would be appreciated.

The detail about main progress:

Things planned in next prs:

  • Improve the impl, now the BlockedGroupIndex will be computed multiple times... I want to avoid it, but it will lead to api change, and not a trivial ting... The change may be like:
// current
    fn update_batch(
        &mut self,
        values: &[ArrayRef],
        group_indices: &[usize],
        opt_filter: Option<&BooleanArray>,
        total_num_groups: usize,
    ) -> Result<()>;

// new
    fn update_batch(
        &mut self,
        values: &[ArrayRef],
        group_indices: &[BlockedGroupIndex],
        opt_filter: Option<&BooleanArray>,
        total_num_groups: usize,
    ) -> Result<()>;

@alamb
Copy link
Contributor

alamb commented Sep 5, 2024

Hi @alamb main comments #11943 (review) for this pr have been fixed, minding have a quick look? It would be appreciated.

Sorry for the delay -- I am back now full time and will review this PR over the next few days

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

Marking as draft as I don't think this is waiting on review and I am trying to keep the review backlog under control

Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Nov 25, 2024
@Rachelint
Copy link
Contributor Author

Codes here are stale, I will submit a new pr when starting to push it forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation functions physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants