perf: minimize allocation in min/max emit_to(First(n))#22416
Open
RyanJamesStewart wants to merge 1 commit into
Open
perf: minimize allocation in min/max emit_to(First(n))#22416RyanJamesStewart wants to merge 1 commit into
RyanJamesStewart wants to merge 1 commit into
Conversation
`MinMaxStructAccumulator` and `MinMaxBytesAccumulator` both emit a prefix of their `min_max` group buffer with `drain(..n).collect()`. That always allocates `n` elements and leaves the retained buffer at its pre-emit capacity. Under an OOM-triggered `EmitTo::First(n)` emit, `n` is close to the buffer length, so this copies the largest allocation -- the same problem apache#22165 fixed for the multi-group-by `GroupColumn` builders. This applies the same fix: a `split_vec_min_alloc` helper that allocates `min(n, len - n)` by choosing `drain+collect` or `split_off+replace` depending on which side is smaller. Both min/max accumulators route through it. The helper is duplicated from `datafusion-physical-plan`'s `split_vec_min_alloc` (added in apache#22165) because that one is `pub(super)`-scoped to a different crate. It is a generic `Vec<T>` utility with no aggregate-specific logic; if maintainers prefer, it could instead be hoisted into `datafusion-common` and shared by both crates -- happy to do that here or as a follow-up. Behavior is unchanged. Adds unit tests covering both branches and the n == len / n == 0 boundaries.
| /// Mirrors `split_vec_min_alloc` in `datafusion-physical-plan`'s | ||
| /// `aggregates::group_values::multi_group_by` module (added in #22165); kept | ||
| /// local here because that helper is `pub(super)` to a different crate. | ||
| fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> { |
Contributor
There was a problem hiding this comment.
please move this in datafusion-common
Contributor
There was a problem hiding this comment.
I have another use case for it in datafusion/expr-common/src/groups_accumulator.rs so it would be nice to have it in one place
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Related to #22164 and #22165. This extends the same fix to the min/max accumulators.
Rationale for this change
#22165 fixed an allocation problem in
EmitTo::First(n):drain(..n).collect()always allocatesnelements and leaves the retained buffer at its pre-emit capacity, so an OOM-triggered emit (wherenis close to the buffer length) ends up copying the largest allocation.The same idiom is still present in two accumulators that #22165 did not touch:
MinMaxStructAccumulator::emit_toindatafusion/functions-aggregate/src/min_max/min_max_struct.rsMinMaxBytesAccumulator::emit_toindatafusion/functions-aggregate/src/min_max/min_max_bytes.rsBoth emit a prefix of their
min_maxgroup buffer withself.min_max.drain(..n).collect().What changes are included in this PR?
Adds a
split_vec_min_allochelper inmin_max.rsthat allocatesmin(n, len - n)by choosingdrain+collectorsplit_off+replacedepending on which side is smaller, and routes both accumulators through it.The helper is duplicated from
datafusion-physical-plan'ssplit_vec_min_alloc(added in #22165) because that one ispub(super)-scoped to a different crate. It is a genericVec<T>utility with no aggregate-specific logic. If maintainers prefer, it could instead be hoisted intodatafusion-commonand shared by both crates; happy to do that in this PR or as a follow-up. I went with the local copy to keep the change minimal and avoid touching recently merged code.Are these changes tested?
Yes. Unit tests cover both branches of the helper plus the
n == lenandn == 0boundaries. Behavior ofemit_tois unchanged.cargo test -p datafusion-functions-aggregatepasses (128 tests).Are there any user-facing changes?
No.