Skip to content

Commit 5574657

Browse files
author
WanKun
committed
ORC-1986: Trigger flush stripe for large input rows
For large input rows, the stripe may excessively large , requiring more memory for both reading and writing one strip. We can check the tree write size in bytes and flush the strip even when the input rows count is less than 5000.
1 parent 6e2aa6d commit 5574657

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

java/core/src/java/org/apache/orc/OrcConf.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ public enum OrcConf {
118118
"If the number of distinct keys in a dictionary is greater than this\n" +
119119
"fraction of the total number of non-null rows, turn off \n" +
120120
"dictionary encoding. Use 1 to always use dictionary encoding."),
121+
DICTIONARY_MAX_SIZE_IN_BYTES("orc.dictionary.maxSizeInBytes",
122+
"hive.exec.orc.dictionary.maxSizeInBytes",
123+
16 * 1024 * 1024,
124+
"If the total size of the dictionary is greater than this\n" +
125+
", turn off dictionary encoding. Use 0 to disable this check."),
121126
ROW_INDEX_STRIDE_DICTIONARY_CHECK("orc.dictionary.early.check",
122127
"hive.orc.row.index.stride.dictionary.check",
123128
true,
@@ -182,6 +187,10 @@ public enum OrcConf {
182187
"added to all of the writers. Valid range is [1,10000] and is primarily meant for" +
183188
"testing. Setting this too low may negatively affect performance."
184189
+ " Use orc.stripe.row.count instead if the value larger than orc.stripe.row.count."),
190+
STRIPE_SIZE_CHECK("orc.stripe.size.check", "hive.exec.orc.default.stripe.size.check",
191+
128L * 1024 * 1024,
192+
"Flush stripe if the tree writer size in bytes is larger than this, " +
193+
"use 0 to disable this check."),
185194
OVERWRITE_OUTPUT_FILE("orc.overwrite.output.file", "orc.overwrite.output.file", false,
186195
"A boolean flag to enable overwriting of the output file if it already exists.\n"),
187196
IS_SCHEMA_EVOLUTION_CASE_SENSITIVE("orc.schema.evolution.case.sensitive",

java/core/src/java/org/apache/orc/impl/WriterImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
112112
private long previousAllocation = -1;
113113
private long memoryLimit;
114114
private final long ROWS_PER_CHECK;
115+
private final long STRIPE_SIZE_CHECK;
115116
private long rowsSinceCheck = 0;
116117
private final OrcFile.Version version;
117118
private final Configuration conf;
@@ -221,6 +222,7 @@ public WriterImpl(FileSystem fs,
221222
// ensure that we are able to handle callbacks before we register ourselves
222223
ROWS_PER_CHECK = Math.min(opts.getStripeRowCountValue(),
223224
OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf));
225+
STRIPE_SIZE_CHECK = OrcConf.STRIPE_SIZE_CHECK.getLong(conf);
224226
this.stripeRowCount= opts.getStripeRowCountValue();
225227
this.stripeSize = opts.getStripeSize();
226228
memoryLimit = stripeSize;
@@ -325,9 +327,9 @@ public boolean checkMemory(double newScale) throws IOException {
325327
}
326328

327329
private boolean checkMemory() throws IOException {
328-
if (rowsSinceCheck >= ROWS_PER_CHECK) {
330+
long size = treeWriter.estimateMemory();
331+
if (rowsSinceCheck >= ROWS_PER_CHECK || (STRIPE_SIZE_CHECK > 0 && size > STRIPE_SIZE_CHECK)) {
329332
rowsSinceCheck = 0;
330-
long size = treeWriter.estimateMemory();
331333
if (LOG.isDebugEnabled()) {
332334
LOG.debug("ORC writer " + physicalWriter + " size = " + size +
333335
" memoryLimit = " + memoryLimit + " rowsInStripe = " + rowsInStripe +

java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public abstract class StringBaseTreeWriter extends TreeWriterBase {
5959
// If the number of keys in a dictionary is greater than this fraction of
6060
//the total number of non-null rows, turn off dictionary encoding
6161
private final double dictionaryKeySizeThreshold;
62+
private final long dictionaryMaxSizeInBytes;
6263
protected Dictionary dictionary;
6364
protected boolean useDictionaryEncoding = true;
6465
private boolean isDirectV2 = true;
@@ -101,6 +102,7 @@ private static Dictionary createDict(Configuration conf) {
101102
rowIndexValueCount.add(0L);
102103
buildIndex = context.buildIndex();
103104
dictionaryKeySizeThreshold = context.getDictionaryKeySizeThreshold(id);
105+
dictionaryMaxSizeInBytes = OrcConf.DICTIONARY_MAX_SIZE_IN_BYTES.getLong(conf);
104106
strideDictionaryCheck =
105107
OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
106108
if (dictionaryKeySizeThreshold <= 0.0) {
@@ -118,7 +120,9 @@ private void checkDictionaryEncoding() {
118120
// based on whether or not the fraction of distinct keys over number of
119121
// non-null rows is less than the configured threshold
120122
float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
121-
useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
123+
useDictionaryEncoding = !isDirectV2 || (ratio <= dictionaryKeySizeThreshold &&
124+
(dictionaryMaxSizeInBytes <= 0 ||
125+
dictionary.getSizeInBytes() <= dictionaryMaxSizeInBytes));
122126
doneDictionaryCheck = true;
123127
}
124128
}

0 commit comments

Comments
 (0)