feat: Implement `debug parquet_to_lp` command to convert parquet to line protocol (#5734)

* feat: add `influxdb_iox debug parquet_to_lp` command

* chore: Run cargo hakari tasks

* fix: update command description

* fix: remove unecessary Result import

* fix: Apply suggestions from code review

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-09-26 10:17:27 -04:00 committed by GitHub
parent b11da1d98b
commit 65f1550126
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 818 additions and 34 deletions

19
Cargo.lock generated
View File

@ -2060,6 +2060,7 @@ dependencies = [
"once_cell",
"panic_logging",
"parquet_file",
"parquet_to_line_protocol",
"predicate",
"predicates",
"rustyline",
@ -3343,6 +3344,24 @@ dependencies = [
"zstd",
]
[[package]]
name = "parquet_to_line_protocol"
version = "0.1.0"
dependencies = [
"datafusion 0.1.0",
"futures",
"influxdb_line_protocol",
"mutable_batch",
"mutable_batch_lp",
"num_cpus",
"object_store",
"parquet_file",
"schema",
"snafu",
"tokio",
"workspace-hack",
]
[[package]]
name = "paste"
version = "1.0.9"

View File

@ -51,6 +51,7 @@ members = [
"packers",
"panic_logging",
"parquet_file",
"parquet_to_line_protocol",
"predicate",
"querier",
"query_functions",

View File

@ -30,6 +30,7 @@ object_store_metrics = { path = "../object_store_metrics" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }
parquet_file = { path = "../parquet_file" }
parquet_to_line_protocol = { path = "../parquet_to_line_protocol" }
iox_query = { path = "../iox_query" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }

View File

@ -3,6 +3,7 @@ use influxdb_iox_client::connection::Connection;
use snafu::prelude::*;
mod namespace;
mod parquet_to_lp;
mod print_cpu;
mod schema;
@ -10,16 +11,20 @@ mod schema;
pub enum Error {
#[snafu(context(false))]
#[snafu(display("Error in schema subcommand: {}", source))]
SchemaError { source: schema::Error },
Schema { source: schema::Error },
#[snafu(context(false))]
#[snafu(display("Error in namespace subcommand: {}", source))]
NamespaceError { source: namespace::Error },
Namespace { source: namespace::Error },
#[snafu(context(false))]
#[snafu(display("Error in parquet_to_lp subcommand: {}", source))]
ParquetToLp { source: parquet_to_lp::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Interrogate internal database data
/// Debugging commands
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
@ -36,6 +41,9 @@ enum Command {
/// Interrogate the schema of a namespace
Schema(schema::Config),
/// Convert IOx Parquet files back into line protocol format
ParquetToLp(parquet_to_lp::Config),
}
pub async fn command<C, CFut>(connection: C, config: Config) -> Result<()>
@ -53,6 +61,7 @@ where
let connection = connection().await;
schema::command(connection, config).await?
}
Command::ParquetToLp(config) => parquet_to_lp::command(config).await?,
}
Ok(())

View File

@ -0,0 +1,73 @@
//! This module implements the `parquet_to_lp` CLI command
use std::{io::BufWriter, path::PathBuf};
use observability_deps::tracing::info;
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot {} output file '{:?}': {}", operation, path, source))]
File {
operation: String,
path: PathBuf,
source: std::io::Error,
},
#[snafu(display("Error converting: {}", source))]
Conversion {
source: parquet_to_line_protocol::Error,
},
#[snafu(display("Cannot flush output: {}", message))]
Flush {
// flush error has the W writer in it, all we care about is the error
message: String,
},
}
/// Convert IOx Parquet files into InfluxDB line protocol format
#[derive(Debug, clap::Parser)]
pub struct Config {
/// Input file name
#[clap(value_parser)]
input: PathBuf,
#[clap(long, short)]
/// The path to which to write. If not specified writes to stdout
output: Option<PathBuf>,
}
pub async fn command(config: Config) -> Result<(), Error> {
let Config { input, output } = config;
info!(?input, ?output, "Exporting parquet as line protocol");
if let Some(output) = output {
let path = &output;
let file = std::fs::File::create(path).context(FileSnafu {
operation: "open",
path,
})?;
let file = convert(input, file).await?;
file.sync_all().context(FileSnafu {
operation: "close",
path,
})?;
} else {
convert(input, std::io::stdout()).await?;
}
Ok(())
}
/// Does the actual conversion, returning the writer when done
async fn convert<W: std::io::Write + Send>(input: PathBuf, writer: W) -> Result<W, Error> {
// use a buffered writer and ensure it is flushed
parquet_to_line_protocol::convert_file(input, BufWriter::new(writer))
.await
.context(ConversionSnafu)?
// flush the buffered writer
.into_inner()
.map_err(|e| Error::Flush {
message: e.to_string(),
})
}

View File

@ -113,14 +113,7 @@ async fn remote_partition_and_get_from_store_and_pull() {
.stdout
.clone();
let v: Value = serde_json::from_slice(&out).unwrap();
let id = v.as_array().unwrap()[0]
.as_object()
.unwrap()
.get("objectStoreId")
.unwrap()
.as_str()
.unwrap();
let object_store_id = get_object_store_id(&out);
let dir = tempdir().unwrap();
let f = dir.path().join("tmp.parquet");
@ -133,7 +126,7 @@ async fn remote_partition_and_get_from_store_and_pull() {
.arg("remote")
.arg("store")
.arg("get")
.arg(id)
.arg(&object_store_id)
.arg(filename)
.assert()
.success()
@ -185,7 +178,7 @@ async fn remote_partition_and_get_from_store_and_pull() {
.success()
.stdout(
predicate::str::contains("wrote file")
.and(predicate::str::contains(id)),
.and(predicate::str::contains(&object_store_id)),
);
}
.boxed()
@ -196,6 +189,132 @@ async fn remote_partition_and_get_from_store_and_pull() {
.await
}
#[tokio::test]
async fn parquet_to_lp() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
let line_protocol = "my_awesome_table,tag1=A,tag2=B val=42i 123456";
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(String::from(line_protocol)),
// wait for partitions to be persisted
Step::WaitForPersisted,
// Run the 'remote partition' command
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let router_addr = state.cluster().router().router_grpc_base().to_string();
// Validate the output of the remote partition CLI command
//
// Looks like:
// {
// "id": "1",
// "shardId": 1,
// "namespaceId": 1,
// "tableId": 1,
// "partitionId": "1",
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
// "minTime": "123456",
// "maxTime": "123456",
// "fileSizeBytes": "2029",
// "rowCount": "1",
// "compactionLevel": "1",
// "createdAt": "1650019674289347000"
// }
let out = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&router_addr)
.arg("remote")
.arg("partition")
.arg("show")
.arg("1")
.assert()
.success()
.get_output()
.stdout
.clone();
let object_store_id = get_object_store_id(&out);
let dir = tempdir().unwrap();
let f = dir.path().join("tmp.parquet");
let filename = f.as_os_str().to_str().unwrap();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&router_addr)
.arg("remote")
.arg("store")
.arg("get")
.arg(&object_store_id)
.arg(filename)
.assert()
.success()
.stdout(
predicate::str::contains("wrote")
.and(predicate::str::contains(filename)),
);
// convert to line protocol (stdout)
let output = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("debug")
.arg("parquet-to-lp")
.arg(filename)
.assert()
.success()
.stdout(predicate::str::contains(line_protocol))
.get_output()
.stdout
.clone();
println!("Got output {:?}", output);
// test writing to output file as well
// Ensure files are actually wrote to the filesystem
let output_file =
tempfile::NamedTempFile::new().expect("Error making temp file");
println!("Writing to {:?}", output_file);
// convert to line protocol (to a file)
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("debug")
.arg("parquet-to-lp")
.arg(filename)
.arg("--output")
.arg(output_file.path())
.assert()
.success();
let file_contents =
std::fs::read(output_file.path()).expect("can not read data from tempfile");
let file_contents = String::from_utf8_lossy(&file_contents);
assert!(
predicate::str::contains(line_protocol).eval(&file_contents),
"Could not file {} in {}",
line_protocol,
file_contents
);
}
.boxed()
})),
],
)
.run()
.await
}
/// Write, compact, then use the remote partition command
#[tokio::test]
async fn compact_and_get_remote_partition() {
@ -261,15 +380,7 @@ async fn compact_and_get_remote_partition() {
.stdout
.clone();
let v: Value = serde_json::from_slice(&out).unwrap();
let id = v.as_array().unwrap()[0]
.as_object()
.unwrap()
.get("objectStoreId")
.unwrap()
.as_str()
.unwrap();
let object_store_id = get_object_store_id(&out);
let dir = tempdir().unwrap();
let f = dir.path().join("tmp.parquet");
let filename = f.as_os_str().to_str().unwrap();
@ -281,7 +392,7 @@ async fn compact_and_get_remote_partition() {
.arg("remote")
.arg("store")
.arg("get")
.arg(id)
.arg(&object_store_id)
.arg(filename)
.assert()
.success()
@ -333,7 +444,7 @@ async fn compact_and_get_remote_partition() {
.success()
.stdout(
predicate::str::contains("wrote file")
.and(predicate::str::contains(id)),
.and(predicate::str::contains(object_store_id)),
);
}
.boxed()
@ -540,3 +651,28 @@ async fn query_ingester() {
.run()
.await
}
/// extracts the parquet filename from JSON that looks like
/// ```text
/// {
/// "id": "1",
/// ...
// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd",
/// ...
/// }
/// ```
fn get_object_store_id(output: &[u8]) -> String {
let v: Value = serde_json::from_slice(output).unwrap();
// We only process the first value, so panic if it isn't there
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 1);
let id = arr[0]
.as_object()
.unwrap()
.get("objectStoreId")
.unwrap()
.as_str()
.unwrap();
id.to_string()
}

View File

@ -329,7 +329,7 @@ impl FieldValue for u64 {
#[cfg(test)]
mod tests {
use crate::{parse_lines, ParsedLine};
use crate::{parse_lines, FieldSet, ParsedLine};
use super::*;
@ -507,4 +507,31 @@ mod tests {
assert_eq!(get_timestamp(11), None);
assert_eq!(get_timestamp(12), Some(1234));
}
#[test]
fn test_float_formatting() {
// ensure that my_float is printed in a way that it is parsed
// as a float (not an int)
let builder = LineProtocolBuilder::new()
.measurement("tag_keys")
.tag("foo", "bar")
.field("my_float", 3.0)
.close_line();
let lp = String::from_utf8(builder.build()).unwrap();
println!("-----\n{lp}-----");
let parsed_lines = parse_lines(&lp)
.collect::<Result<Vec<ParsedLine<'_>>, _>>()
.unwrap();
assert_eq!(parsed_lines.len(), 1);
let parsed_line = &parsed_lines[0];
let expected_fields = vec![("my_float".into(), crate::FieldValue::F64(3.0))]
.into_iter()
.collect::<FieldSet<'_>>();
assert_eq!(parsed_line.field_set, expected_fields)
}
}

View File

@ -297,6 +297,20 @@ pub struct IoxMetadata {
}
impl IoxMetadata {
/// Convert to base64 encoded protobuf format
pub fn to_base64(&self) -> std::result::Result<String, prost::EncodeError> {
Ok(base64::encode(&self.to_protobuf()?))
}
/// Read from base64 encoded protobuf format
pub fn from_base64(proto_base64: &[u8]) -> Result<Self> {
let proto_bytes = base64::decode(proto_base64)
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBrokenSnafu)?;
Self::from_protobuf(&proto_bytes)
}
/// Convert to protobuf v3 message.
pub(crate) fn to_protobuf(&self) -> std::result::Result<Vec<u8>, prost::EncodeError> {
let sort_key = self.sort_key.as_ref().map(|key| proto::SortKey {
@ -718,12 +732,8 @@ impl DecodedIoxParquetMetaData {
// extract protobuf message from key-value entry
let proto_base64 = kv.value.as_ref().context(IoxMetadataMissingSnafu)?;
let proto_bytes = base64::decode(proto_base64)
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBrokenSnafu)?;
// convert to Rust object
IoxMetadata::from_protobuf(proto_bytes.as_slice())
// read to rust object
IoxMetadata::from_base64(proto_base64.as_bytes())
}
/// Read IOx schema from parquet metadata.

View File

@ -174,12 +174,10 @@ where
/// serialising the given [`IoxMetadata`] and embedding it as a key=value
/// property keyed by [`METADATA_KEY`].
fn writer_props(meta: &IoxMetadata) -> Result<WriterProperties, prost::EncodeError> {
let bytes = meta.to_protobuf()?;
let builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {
key: METADATA_KEY.to_string(),
value: Some(base64::encode(&bytes)),
value: Some(meta.to_base64()?),
}]))
.set_compression(Compression::ZSTD)
.set_max_row_group_size(ROW_GROUP_WRITE_SIZE);

View File

@ -0,0 +1,23 @@
[package]
name = "parquet_to_line_protocol"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
datafusion = { path = "../datafusion" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
futures = {version = "0.3"}
num_cpus = "1.13.1"
object_store = { version = "0.5.0" }
parquet_file = { path = "../parquet_file" }
schema = { path = "../schema" }
tokio = "1.0"
snafu = "0.7"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -0,0 +1,250 @@
use datafusion::arrow::{
array::{
as_boolean_array, as_dictionary_array, as_primitive_array, as_string_array, Array,
ArrayAccessor, StringArray,
},
datatypes::{Float64Type, Int32Type, Int64Type, TimestampNanosecondType, UInt64Type},
record_batch::RecordBatch,
};
use influxdb_line_protocol::{builder::FieldValue, FieldValue as LPFieldValue};
use schema::{InfluxColumnType, InfluxFieldType, Schema};
/// Converts a [`RecordBatch`] into line protocol lines.
pub(crate) fn convert_to_lines(
measurement_name: &str,
iox_schema: &Schema,
batch: &RecordBatch,
) -> Result<Vec<u8>, String> {
let mut lp_builder = influxdb_line_protocol::LineProtocolBuilder::new();
for index in 0..batch.num_rows() {
let lp_tags = lp_builder.measurement(measurement_name);
// Add all tags
let lp_tags = tags_values_iter(iox_schema, index, batch)
.into_iter()
.fold(lp_tags, |lp_tags, tag_column| {
lp_tags.tag(tag_column.name, tag_column.value)
});
// add fields
let mut fields = field_values_iter(iox_schema, index, batch).into_iter();
// need at least one field (to put builder into "AfterTag" mode
let first_field = fields
.next()
.ok_or_else(|| format!("Need at least one field, schema had none: {:?}", iox_schema))?;
let lp_fields = lp_tags.field(first_field.name, first_field);
// add rest of fileds
let lp_fields = fields.fold(lp_fields, |lp_fields, field| {
lp_fields.field(field.name, field)
});
let ts = timestamp_value(iox_schema, index, batch)?;
lp_builder = lp_fields.timestamp(ts).close_line();
}
Ok(lp_builder.build())
}
/// Return an iterator over all non null tags in a batch
fn tags_values_iter<'a>(
iox_schema: &'a Schema,
row_index: usize,
batch: &'a RecordBatch,
) -> impl IntoIterator<Item = TagColumn<'a>> {
iox_schema
.iter()
.enumerate()
.filter_map(move |(column_index, (influx_column_type, field))| {
if matches!(influx_column_type, Some(InfluxColumnType::Tag)) {
// tags are always dictionaries
let arr = as_dictionary_array::<Int32Type>(batch.column(column_index))
.downcast_dict::<StringArray>()
.expect("Tag was not a string dictionary array");
// If the value of this column is not null, return it.
if arr.is_valid(row_index) {
return Some(TagColumn {
name: field.name(),
value: arr.value(row_index),
});
}
}
None
})
}
/// Represents a particular column along with code that knows how to add that value to a builder.
struct TagColumn<'a> {
name: &'a str,
value: &'a str,
}
/// Return an iterator over all non null fields in a batch
fn field_values_iter<'a>(
iox_schema: &'a Schema,
row_index: usize,
batch: &'a RecordBatch,
) -> impl IntoIterator<Item = FieldColumn<'a>> {
iox_schema
.iter()
.enumerate()
.filter_map(move |(column_index, (influx_column_type, field))| {
// Skip any value that is NULL
let arr = batch.column(column_index);
if !arr.is_valid(row_index) {
return None;
}
let name = field.name();
// Extract the value from the relevant array and convert it
let value = match influx_column_type {
Some(InfluxColumnType::Field(InfluxFieldType::Float)) => {
LPFieldValue::F64(as_primitive_array::<Float64Type>(arr).value(row_index))
}
Some(InfluxColumnType::Field(InfluxFieldType::Integer)) => {
LPFieldValue::I64(as_primitive_array::<Int64Type>(arr).value(row_index))
}
Some(InfluxColumnType::Field(InfluxFieldType::UInteger)) => {
LPFieldValue::U64(as_primitive_array::<UInt64Type>(arr).value(row_index))
}
Some(InfluxColumnType::Field(InfluxFieldType::String)) => {
LPFieldValue::String(as_string_array(arr).value(row_index).into())
}
Some(InfluxColumnType::Field(InfluxFieldType::Boolean)) => {
LPFieldValue::Boolean(as_boolean_array(arr).value(row_index))
}
// not a field
Some(InfluxColumnType::Tag) | Some(InfluxColumnType::Timestamp) | None => {
return None
}
};
Some(FieldColumn { name, value })
})
}
/// Represents a particular Field column's value in a way that knows how to format
struct FieldColumn<'a> {
name: &'a str,
value: LPFieldValue<'a>,
}
impl<'a> FieldValue for FieldColumn<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.value {
LPFieldValue::I64(v) => v.fmt(f),
LPFieldValue::U64(v) => v.fmt(f),
LPFieldValue::F64(v) => v.fmt(f),
LPFieldValue::String(v) => v.as_str().fmt(f),
LPFieldValue::Boolean(v) => v.fmt(f),
}
}
}
/// Find the timestamp value for the specified row
fn timestamp_value<'a>(
iox_schema: &'a Schema,
row_index: usize,
batch: &'a RecordBatch,
) -> Result<i64, String> {
let column_index = iox_schema
.iter()
.enumerate()
.filter_map(move |(column_index, (influx_column_type, _))| {
if matches!(influx_column_type, Some(InfluxColumnType::Timestamp)) {
Some(column_index)
} else {
None
}
})
.next()
.ok_or_else(|| "No timestamp column found in schema".to_string())?;
// timestamps are always TimestampNanosecondArray's and should always have a timestamp value filled in
let arr = as_primitive_array::<TimestampNanosecondType>(batch.column(column_index));
if !arr.is_valid(row_index) {
Err(format!(
"TimestampValue was unexpectedly null at row {}",
row_index
))
} else {
Ok(arr.value(row_index))
}
}
#[cfg(test)]
mod tests {
use super::*;
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
#[test]
fn basic() {
round_trip("my_measurement_name,tag=foo value=4 1000");
}
#[test]
fn no_tags() {
round_trip("my_no_tag_measurement_name value=4 1000");
}
#[test]
#[should_panic = "Error parsing line protocol: LineProtocol { source: FieldSetMissing, line: 1 }"]
fn no_fields() {
round_trip("my_no_tag_measurement_name,tag=4 1000");
}
#[test]
fn all_types() {
// Note we use cannonical format (e.g. 'true' instead of 't')
round_trip(
r#"m,tag=row1 float_field=64 450
m,tag2=row2 float_field=65 550
m,tag2=row3 int_field=65i 560
m,tag2=row4 uint_field=5u 580
m,tag2=row5 bool_field=true 590
m,tag2=row6 str_field="blargh" 600
m,tag2=multi_field bool_field=false,str_field="blargh" 610
"#,
);
}
/// ensures that parsing line protocol to record batches and then
/// converting it back to line protocol results in the same output
///
/// Note it must use cannonical format (e.g. 'true' instead of 't')
fn round_trip(lp: &str) {
let default_time = 0;
let mutable_batches =
lines_to_batches(lp, default_time).expect("Error parsing line protocol");
assert_eq!(
mutable_batches.len(),
1,
"round trip only supports one measurement"
);
let (table_name, mutable_batch) = mutable_batches.into_iter().next().unwrap();
let selection = Selection::All;
let record_batch = mutable_batch.to_arrow(selection).unwrap();
let iox_schema = mutable_batch.schema(selection).unwrap();
let output_lp = convert_to_lines(&table_name, &iox_schema, &record_batch)
.expect("error converting lines");
let output_lp = String::from_utf8_lossy(&output_lp);
let lp = lp.trim();
let output_lp = output_lp.trim();
assert_eq!(
lp, output_lp,
"\n\nInput:\n\n{}\n\nOutput:\n\n{}\n",
lp, output_lp
)
}
}

View File

@ -0,0 +1,237 @@
//! Code that can convert between parquet files and line protocol
use std::{
io::Write,
path::{Path, PathBuf},
result::Result,
sync::Arc,
};
use datafusion::{
arrow::datatypes::SchemaRef as ArrowSchemaRef,
datasource::{
file_format::{parquet::ParquetFormat, FileFormat},
listing::PartitionedFile,
object_store::ObjectStoreUrl,
},
execution::context::TaskContext,
physical_plan::{
execute_stream,
file_format::{FileScanConfig, ParquetExec},
SendableRecordBatchStream, Statistics,
},
prelude::{SessionConfig, SessionContext},
};
use futures::StreamExt;
use object_store::{
local::LocalFileSystem, path::Path as ObjectStorePath, ObjectMeta, ObjectStore,
};
use parquet_file::metadata::{IoxMetadata, METADATA_KEY};
use schema::Schema;
use snafu::{OptionExt, ResultExt, Snafu};
mod batch;
use batch::convert_to_lines;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid path: {:?}: {}", path, source))]
Path {
path: PathBuf,
source: object_store::path::Error,
},
#[snafu(display("Error listing: {:?}: {}", object_store_path, source))]
ObjectStorePath {
object_store_path: ObjectStorePath,
source: object_store::Error,
},
#[snafu(display(
"Can not find IOx metadata in parquet metadata. Could not find {}",
METADATA_KEY
))]
MissingMetadata {},
#[snafu(display("Error reading IOx metadata: {}", source))]
Metadata {
source: parquet_file::metadata::Error,
},
#[snafu(display("Error inferring IOx schema: {}", source))]
InferringSchema {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Error reading IOx schema: {}", source))]
Schema { source: schema::Error },
#[snafu(display("Error in processing task: {}", source))]
Task { source: tokio::task::JoinError },
#[snafu(display("Error converting: {}", message))]
Conversion { message: String },
#[snafu(display("Error executing: {}", source))]
ExecutingStream {
source: datafusion::error::DataFusionError,
},
#[snafu(display("IO Error: {}", source))]
IO { source: std::io::Error },
}
/// Converts a parquet file that was written by IOx from the local
/// file system path specified to line protocol and writes those bytes
/// to `output`, returning the writer on success
pub async fn convert_file<W, P>(path: P, mut output: W) -> Result<W, Error>
where
P: AsRef<Path>,
W: Write,
{
let path = path.as_ref();
let object_store_path =
ObjectStorePath::from_filesystem_path(path).context(PathSnafu { path })?;
// Fire up a parquet reader, read the batches, and then convert
// them asynchronously in parallel
let object_store = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let object_store_url = ObjectStoreUrl::local_filesystem();
let object_meta = object_store
.head(&object_store_path)
.await
.context(ObjectStorePathSnafu { object_store_path })?;
let reader = ParquetFileReader::try_new(object_store, object_store_url, object_meta).await?;
// Determines the measurement name from the IOx metadata
let schema = reader.schema();
let encoded_meta = schema
.metadata
.get(METADATA_KEY)
.context(MissingMetadataSnafu)?;
let iox_meta = IoxMetadata::from_base64(encoded_meta.as_bytes()).context(MetadataSnafu)?;
// Attempt to extract the IOx schema from the schema stored in the
// parquet file. This schema is where information such as what
// columns are tags and fields is stored
let iox_schema: Schema = schema.try_into().context(SchemaSnafu)?;
let iox_schema = Arc::new(iox_schema);
let measurement_name = iox_meta.table_name;
// now convert the record batches to line protocol, in parallel
let mut lp_stream = reader
.read()
.await?
.map(|batch| {
let iox_schema = Arc::clone(&iox_schema);
let measurement_name = Arc::clone(&measurement_name);
tokio::task::spawn(async move {
batch
.map_err(|e| format!("Something bad happened reading batch: {}", e))
.and_then(|batch| convert_to_lines(&measurement_name, &iox_schema, &batch))
})
})
// run some number of futures in parallel
.buffered(num_cpus::get());
// but print them to the output stream in the same order
while let Some(data) = lp_stream.next().await {
let data = data
.context(TaskSnafu)?
.map_err(|message| Error::Conversion { message })?;
output.write_all(&data).context(IOSnafu)?;
}
Ok(output)
}
/// Handles the details of interacting with parquet libraries /
/// readers. Tries not to have any IOx specific logic
struct ParquetFileReader {
object_store: Arc<dyn ObjectStore>,
object_store_url: ObjectStoreUrl,
/// Name / path information of the object to read
object_meta: ObjectMeta,
/// Parquet file metadata
schema: ArrowSchemaRef,
/// number of rows to read in each batch (can pick small to
/// increase parallelism). Defaults to 1000
batch_size: usize,
}
impl ParquetFileReader {
/// Find and open the specified parquet file, and read its metadata / schema
async fn try_new(
object_store: Arc<dyn ObjectStore>,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
) -> Result<Self, Error> {
// Keep metadata so we can find the measurement name
let format = ParquetFormat::default().with_skip_metadata(false);
// Use datafusion parquet reader to read the metadata from the
// file.
let schema = format
.infer_schema(&object_store, &[object_meta.clone()])
.await
.context(InferringSchemaSnafu)?;
Ok(Self {
object_store,
object_store_url,
object_meta,
schema,
batch_size: 1000,
})
}
// retrieves the Arrow schema for this file
fn schema(&self) -> ArrowSchemaRef {
Arc::clone(&self.schema)
}
/// read the parquet file as a stream
async fn read(&self) -> Result<SendableRecordBatchStream, Error> {
let base_config = FileScanConfig {
object_store_url: self.object_store_url.clone(),
file_schema: self.schema(),
file_groups: vec![vec![PartitionedFile {
object_meta: self.object_meta.clone(),
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
};
// set up enough datafusion context to do the real read session
let predicate = None;
let metadata_size_hint = None;
let exec = ParquetExec::new(base_config, predicate, metadata_size_hint);
let session_config = SessionConfig::new().with_batch_size(self.batch_size);
let session_ctx = SessionContext::with_config(session_config);
let object_store = Arc::clone(&self.object_store);
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", "iox", object_store);
execute_stream(Arc::new(exec), task_ctx)
.await
.context(ExecutingStreamSnafu)
}
}