fix: Setup parquet column encoding correctly

pull/24376/head
alamb 2020-06-23 08:29:51 -04:00
parent 943a6cd299
commit eee1e9fe77
2 changed files with 62 additions and 27 deletions

View File

@ -5,7 +5,7 @@ use std::{
rc::Rc,
};
use log::{debug, warn};
use log::debug;
use parquet::{
basic::{Compression, Encoding, LogicalType, Repetition, Type as PhysicalType},
errors::ParquetError,
@ -325,47 +325,79 @@ fn create_writer_props(schema: &delorean_table_schema::Schema) -> Rc<WriterPrope
// Setup encoding as defined in
// https://github.com/influxdata/delorean/blob/alamb/encoding_thoughts/docs/encoding_thoughts.md
//
// Note: the property writer builder's default is to encode
// everything with dictionary encoding, and it turns out that
// dictionary encoding overrides all other encodings. Thus, we
// must explicitly disable dictionary encoding when another
// encoding is desired.
let col_defs = schema.get_col_defs();
for col_def in col_defs {
// locates the column definition in the schema
let col_path = ColumnPath::from(col_def.name.clone());
match col_def.data_type {
data_type @ delorean_table_schema::DataType::Boolean
| data_type @ delorean_table_schema::DataType::Float
| data_type @ delorean_table_schema::DataType::Integer => {
data_type @ delorean_table_schema::DataType::Boolean => {
debug!(
"Setting encoding of {:?} col {} to RLE",
data_type, col_path
);
builder = builder.set_column_encoding(col_path, Encoding::RLE);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::RLE)
.set_column_dictionary_enabled(col_path, false);
}
data_type @ delorean_table_schema::DataType::Integer => {
debug!(
"Setting encoding of {:?} col {} to DELTA_ENCODING",
data_type, col_path
);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(col_path, false);
}
data_type @ delorean_table_schema::DataType::Float => {
debug!(
"Setting encoding of {:?} col {} to PLAIN",
data_type, col_path
);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(col_path, false);
}
// tag values are often very much repeated
delorean_table_schema::DataType::String if schema.is_tag(&col_def) => {
data_type @ delorean_table_schema::DataType::String if schema.is_tag(&col_def) => {
debug!(
"Setting encoding of tag val DataType::String col {} to dictionary",
col_path
"Setting encoding of tag val {:?} col {} to dictionary",
data_type, col_path
);
builder = builder.set_column_dictionary_enabled(col_path, true);
}
delorean_table_schema::DataType::String => {
debug!("Setting encoding of non-tag val DataType::String col {} to DELTA_LENGTH_BYTE_ARRAY", col_path);
builder = builder.set_column_encoding(col_path, Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
delorean_table_schema::DataType::Timestamp => {
data_type @ delorean_table_schema::DataType::String => {
debug!(
"Setting encoding of LPTimestamp col {} to DELTA_BINARY_PACKED",
col_path
"Setting encoding of non-tag val {:?} col {} to DELTA_LENGTH_BYTE_ARRAY",
data_type, col_path
);
builder = builder.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::DELTA_LENGTH_BYTE_ARRAY)
.set_column_dictionary_enabled(col_path, false);
}
data_type @ delorean_table_schema::DataType::Timestamp => {
debug!(
"Setting encoding of {:?} col {} to DELTA_BINARY_PACKED",
data_type, col_path
);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(col_path, false);
}
};
}
// Even though the 'set_statistics_enabled()' method is called here, the resulting
// parquet file does not appear to have statistics enabled.
// TODO: file a clear bug in the parquet JIRA project
warn!("WARNING WARNING -- statistics generation does not appear to be working");
//
// This is due to the fact that the underlying rust parquet
// library does not support statistics generation at this time.
let props = builder
.set_statistics_enabled(true)
.set_created_by("Delorean".to_string())
@ -451,31 +483,34 @@ mod tests {
writer_props.compression(&string_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.dictionary_enabled(&string_field_colpath), true);
assert_eq!(
writer_props.dictionary_enabled(&string_field_colpath),
false
);
assert_eq!(writer_props.statistics_enabled(&string_field_colpath), true);
let float_field_colpath = ColumnPath::from("float_field");
assert_eq!(
writer_props.encoding(&float_field_colpath),
Some(Encoding::RLE)
Some(Encoding::PLAIN)
);
assert_eq!(
writer_props.compression(&float_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.dictionary_enabled(&float_field_colpath), true);
assert_eq!(writer_props.dictionary_enabled(&float_field_colpath), false);
assert_eq!(writer_props.statistics_enabled(&float_field_colpath), true);
let int_field_colpath = ColumnPath::from("int_field");
assert_eq!(
writer_props.encoding(&int_field_colpath),
Some(Encoding::RLE)
Some(Encoding::DELTA_BINARY_PACKED)
);
assert_eq!(
writer_props.compression(&int_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), true);
assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), false);
assert_eq!(writer_props.statistics_enabled(&int_field_colpath), true);
let bool_field_colpath = ColumnPath::from("bool_field");
@ -487,7 +522,7 @@ mod tests {
writer_props.compression(&bool_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.dictionary_enabled(&bool_field_colpath), true);
assert_eq!(writer_props.dictionary_enabled(&bool_field_colpath), false);
assert_eq!(writer_props.statistics_enabled(&bool_field_colpath), true);
let timestamp_field_colpath = ColumnPath::from("timestamp");
@ -501,7 +536,7 @@ mod tests {
);
assert_eq!(
writer_props.dictionary_enabled(&timestamp_field_colpath),
true
false
);
assert_eq!(
writer_props.statistics_enabled(&timestamp_field_colpath),

View File

@ -108,8 +108,8 @@ The Apache Parquet file format has several [available encodings](https://github.
| Line Protocol Component | Line Protocol Type | Disk/Parquet Logical Type | Encoding
| ----------------------- | ------------------ | -------------------------- | -----------------------|
| Tag Value | `String` |`STRING` | Dictionary (`RLE_DICTIONARY=8`) |
| Field Value (float) | `float` | `DOUBLE` | TBD (RLE?, Byte Stream Split: (`BYTE_STREAM_SPLIT = 9`) |
| Field Value (integer) | `integer` | `INT(bitWidth=64, isSigned=true)` | Run Length / Bit-packing hybrid (`RLE=3`)|
| Field Value (float) | `float` | `DOUBLE` | Plain (`PLAIN = 0`) --> Byte stream split: (`BYTE_STREAM_SPLIT = 9`) if supported |
| Field Value (integer) | `integer` | `INT(bitWidth=64, isSigned=true)` | Delta binary packed (`DELTA_BINARY_PACKED = 5`)|
| Field Value (strings) | UTF-8 `String` | `STRING` | Delta-length byte array: (`DELTA_LENGTH_BYTE_ARRAY = 6`) |
| Field Value (Boolean) | `Boolean` | `BOOLEAN` | Run Length / Bit-packing hybrid (`RLE=3`) |
| Timestamp | 64-bit Unix Timestamp | `TIMESTAMP(isAdjustedToUTC=true, precision=NANOS)` | Delta Encoding (`DELTA_BINARY_PACKED=5`) |