Skip to content

Commit 04f217b

Browse files
XiangpengHaoalamb
andauthored
Speed up Parquet filter pushdown v4 (Predicate evaluation cache for async_reader) (#7850)
This is my latest attempt to make pushdown faster. Prior art: #6921 cc @alamb @zhuqi-lucas - Part of #8000 - Related to apache/datafusion#3463 - Related to #7456 - Closes #7363 - Closes #8003 ## Problems of #6921 1. It proactively loads entire row group into memory. (rather than only loading pages that passing the filter predicate) 2. It only cache decompressed pages, still paying the decoding cost twice. This PR takes a different approach, it does not change the decoding pipeline, so we avoid the problem 1. It also caches the arrow record batch, so avoid problem 2. But this means we need to use more memory to cache data. ## How it works? 1. It instruments the `array_readers` with a transparent `cached_array_reader`. 2. The cache layer will first consult the `RowGroupCache` to look for a batch, and only reads from underlying reader on a cache miss. 3. There're cache producer and cache consumer. Producer is when we build filters we insert arrow arrays into cache, consumer is when we build outputs, we remove arrow array from cache. So the memory usage should look like this: ``` ▲ │ ╭─╮ │ ╱ ╲ │ ╱ ╲ │ ╱ ╲ │ ╱ ╲ │╱ ╲ └─────────────╲──────► Time │ │ │ Filter Peak Consume Phase (Built) (Decrease) ``` In a concurrent setup, not all reader may reach the peak point at the same time, so the peak system memory usage might be lower. 4. It has a max_cache_size knob, this is a per row group setting. If the row group has used up the budget, the cache stops taking new data. and the `cached_array_reader` will fallback to read and decode from Parquet. ## Other benefits 1. This architecture allows nested columns (but not implemented in this pr), i.e., it's future proof. 2. There're many performance optimizations to further squeeze the performance, but even with current state, it has no regressions. ## How does it perform? My criterion somehow won't produces a result from `--save-baseline`, so I asked llm to generate a table from this benchmark: ``` cargo bench --bench arrow_reader_clickbench --features "arrow async" "async" ``` `Baseline` is the implementation for current main branch. `New Unlimited` is the new pushdown with unlimited memory budget. `New 100MB` is the new pushdown but the memory budget for a row group caching is 100MB. ``` Query | Baseline (ms) | New Unlimited (ms) | Diff (ms) | New 100MB (ms) | Diff (ms) -------+--------------+--------------------+-----------+----------------+----------- Q1 | 0.847 | 0.803 | -0.044 | 0.812 | -0.035 Q10 | 4.060 | 6.273 | +2.213 | 6.216 | +2.156 Q11 | 5.088 | 7.152 | +2.064 | 7.193 | +2.105 Q12 | 18.485 | 14.937 | -3.548 | 14.904 | -3.581 Q13 | 24.859 | 21.908 | -2.951 | 21.705 | -3.154 Q14 | 23.994 | 20.691 | -3.303 | 20.467 | -3.527 Q19 | 1.894 | 1.980 | +0.086 | 1.996 | +0.102 Q20 | 90.325 | 64.689 | -25.636 | 74.478 | -15.847 Q21 | 106.610 | 74.766 | -31.844 | 99.557 | -7.053 Q22 | 232.730 | 101.660 | -131.070 | 204.800 | -27.930 Q23 | 222.800 | 186.320 | -36.480 | 186.590 | -36.210 Q24 | 24.840 | 19.762 | -5.078 | 19.908 | -4.932 Q27 | 80.463 | 47.118 | -33.345 | 49.597 | -30.866 Q28 | 78.999 | 47.583 | -31.416 | 51.432 | -27.567 Q30 | 28.587 | 28.710 | +0.123 | 28.926 | +0.339 Q36 | 80.157 | 57.954 | -22.203 | 58.012 | -22.145 Q37 | 46.962 | 45.901 | -1.061 | 45.386 | -1.576 Q38 | 16.324 | 16.492 | +0.168 | 16.522 | +0.198 Q39 | 20.754 | 20.734 | -0.020 | 20.648 | -0.106 Q40 | 22.554 | 21.707 | -0.847 | 21.995 | -0.559 Q41 | 16.430 | 16.391 | -0.039 | 16.581 | +0.151 Q42 | 6.045 | 6.157 | +0.112 | 6.120 | +0.075 ``` 1. If we consider the diff within 5ms to be noise, then we are never worse than the current implementation. 2. We see significant improvements for string-heavy queries, because string columns are large, they take time to decompress and decode. 3. 100MB cache budget seems to have small performance impact. ## Limitations 1. It only works for async readers, because sync reader do not follow the same row group by row group structure. 2. It is memory hungry -- compared to #6921. But changing decoding pipeline without eager loading entire row group would require significant changes to the current decoding infrastructure, e.g., we need to make page iterator an async function. 3. It currently doesn't support nested columns, more specifically, it doesn't support nested columns with nullable parents. but supporting it is straightforward, no big changes. 4. The current memory accounting is not accurate, it will overestimate the memory usage, especially when reading string view arrays, where multiple string view may share the same underlying buffer, and that buffer size is counted twice. Anyway, we never exceeds the user configured memory usage. 5. If one row passes the filter, the entire batch will be cached. We can probably optimize this though. ## Next steps? This pr is largely proof of concept, I want to collect some feedback before sending a multi-thousands pr :) Some items I can think of: 1. Design an interface for user to specify the cache size limit, currently it's hard-coded. 2. Don't instrument nested array reader if the parquet file has nullable parent. currently it will panic 3. More testing, and integration test/benchmark with Datafusion --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 4a21443 commit 04f217b

File tree

12 files changed

+1869
-29
lines changed

12 files changed

+1869
-29
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,97 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
18+
use std::sync::{Arc, Mutex};
1919

2020
use arrow_schema::{DataType, Fields, SchemaBuilder};
2121

2222
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
23+
use crate::arrow::array_reader::cached_array_reader::CacheRole;
24+
use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
2325
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2426
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
27+
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
2528
use crate::arrow::array_reader::{
2629
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
2730
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
2831
PrimitiveArrayReader, RowGroups, StructArrayReader,
2932
};
33+
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
3034
use crate::arrow::schema::{ParquetField, ParquetFieldType};
3135
use crate::arrow::ProjectionMask;
3236
use crate::basic::Type as PhysicalType;
3337
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
3438
use crate::errors::{ParquetError, Result};
3539
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
3640

41+
/// Builder for [`CacheOptions`]
42+
#[derive(Debug, Clone)]
43+
pub struct CacheOptionsBuilder<'a> {
44+
/// Projection mask to apply to the cache
45+
pub projection_mask: &'a ProjectionMask,
46+
/// Cache to use for storing row groups
47+
pub cache: Arc<Mutex<RowGroupCache>>,
48+
}
49+
50+
impl<'a> CacheOptionsBuilder<'a> {
51+
/// create a new cache options builder
52+
pub fn new(projection_mask: &'a ProjectionMask, cache: Arc<Mutex<RowGroupCache>>) -> Self {
53+
Self {
54+
projection_mask,
55+
cache,
56+
}
57+
}
58+
59+
/// Return a new [`CacheOptions`] for producing (populating) the cache
60+
pub fn producer(self) -> CacheOptions<'a> {
61+
CacheOptions {
62+
projection_mask: self.projection_mask,
63+
cache: self.cache,
64+
role: CacheRole::Producer,
65+
}
66+
}
67+
68+
/// return a new [`CacheOptions`] for consuming (reading) the cache
69+
pub fn consumer(self) -> CacheOptions<'a> {
70+
CacheOptions {
71+
projection_mask: self.projection_mask,
72+
cache: self.cache,
73+
role: CacheRole::Consumer,
74+
}
75+
}
76+
}
77+
78+
/// Cache options containing projection mask, cache, and role
79+
#[derive(Clone)]
80+
pub struct CacheOptions<'a> {
81+
pub projection_mask: &'a ProjectionMask,
82+
pub cache: Arc<Mutex<RowGroupCache>>,
83+
pub role: CacheRole,
84+
}
85+
3786
/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
3887
pub struct ArrayReaderBuilder<'a> {
88+
/// Source of row group data
3989
row_groups: &'a dyn RowGroups,
90+
/// Optional cache options for the array reader
91+
cache_options: Option<&'a CacheOptions<'a>>,
92+
/// metrics
93+
metrics: &'a ArrowReaderMetrics,
4094
}
4195

4296
impl<'a> ArrayReaderBuilder<'a> {
43-
pub fn new(row_groups: &'a dyn RowGroups) -> Self {
44-
Self { row_groups }
97+
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
98+
Self {
99+
row_groups,
100+
cache_options: None,
101+
metrics,
102+
}
103+
}
104+
105+
/// Add cache options to the builder
106+
pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
107+
self.cache_options = cache_options;
108+
self
45109
}
46110

47111
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
@@ -69,7 +133,26 @@ impl<'a> ArrayReaderBuilder<'a> {
69133
mask: &ProjectionMask,
70134
) -> Result<Option<Box<dyn ArrayReader>>> {
71135
match field.field_type {
72-
ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask),
136+
ParquetFieldType::Primitive { col_idx, .. } => {
137+
let Some(reader) = self.build_primitive_reader(field, mask)? else {
138+
return Ok(None);
139+
};
140+
let Some(cache_options) = self.cache_options.as_ref() else {
141+
return Ok(Some(reader));
142+
};
143+
144+
if cache_options.projection_mask.leaf_included(col_idx) {
145+
Ok(Some(Box::new(CachedArrayReader::new(
146+
reader,
147+
Arc::clone(&cache_options.cache),
148+
col_idx,
149+
cache_options.role,
150+
self.metrics.clone(), // cheap clone
151+
))))
152+
} else {
153+
Ok(Some(reader))
154+
}
155+
}
73156
ParquetFieldType::Group { .. } => match &field.arrow_type {
74157
DataType::Map(_, _) => self.build_map_reader(field, mask),
75158
DataType::Struct(_) => self.build_struct_reader(field, mask),
@@ -375,7 +458,8 @@ mod tests {
375458
)
376459
.unwrap();
377460

378-
let array_reader = ArrayReaderBuilder::new(&file_reader)
461+
let metrics = ArrowReaderMetrics::disabled();
462+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
379463
.build_array_reader(fields.as_ref(), &mask)
380464
.unwrap();
381465

0 commit comments

Comments
 (0)