diff --git a/Cargo.lock b/Cargo.lock index 7017be0293..ae374f6931 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,6 +2060,7 @@ dependencies = [ "once_cell", "panic_logging", "parquet_file", + "parquet_to_line_protocol", "predicate", "predicates", "rustyline", @@ -3343,6 +3344,24 @@ dependencies = [ "zstd", ] +[[package]] +name = "parquet_to_line_protocol" +version = "0.1.0" +dependencies = [ + "datafusion 0.1.0", + "futures", + "influxdb_line_protocol", + "mutable_batch", + "mutable_batch_lp", + "num_cpus", + "object_store", + "parquet_file", + "schema", + "snafu", + "tokio", + "workspace-hack", +] + [[package]] name = "paste" version = "1.0.9" diff --git a/Cargo.toml b/Cargo.toml index b15236d658..37f7b6b2c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "packers", "panic_logging", "parquet_file", + "parquet_to_line_protocol", "predicate", "querier", "query_functions", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index d42a243bb8..19fca1318d 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -30,6 +30,7 @@ object_store_metrics = { path = "../object_store_metrics" } observability_deps = { path = "../observability_deps" } panic_logging = { path = "../panic_logging" } parquet_file = { path = "../parquet_file" } +parquet_to_line_protocol = { path = "../parquet_to_line_protocol" } iox_query = { path = "../iox_query" } schema = { path = "../schema" } sharder = { path = "../sharder" } diff --git a/influxdb_iox/src/commands/debug/mod.rs b/influxdb_iox/src/commands/debug/mod.rs index 4d08660412..cd121e7f5a 100644 --- a/influxdb_iox/src/commands/debug/mod.rs +++ b/influxdb_iox/src/commands/debug/mod.rs @@ -3,6 +3,7 @@ use influxdb_iox_client::connection::Connection; use snafu::prelude::*; mod namespace; +mod parquet_to_lp; mod print_cpu; mod schema; @@ -10,16 +11,20 @@ mod schema; pub enum Error { #[snafu(context(false))] #[snafu(display("Error in schema subcommand: {}", source))] - SchemaError { source: schema::Error }, + Schema { source: schema::Error }, #[snafu(context(false))] #[snafu(display("Error in namespace subcommand: {}", source))] - NamespaceError { source: namespace::Error }, + Namespace { source: namespace::Error }, + + #[snafu(context(false))] + #[snafu(display("Error in parquet_to_lp subcommand: {}", source))] + ParquetToLp { source: parquet_to_lp::Error }, } pub type Result = std::result::Result; -/// Interrogate internal database data +/// Debugging commands #[derive(Debug, clap::Parser)] pub struct Config { #[clap(subcommand)] @@ -36,6 +41,9 @@ enum Command { /// Interrogate the schema of a namespace Schema(schema::Config), + + /// Convert IOx Parquet files back into line protocol format + ParquetToLp(parquet_to_lp::Config), } pub async fn command(connection: C, config: Config) -> Result<()> @@ -53,6 +61,7 @@ where let connection = connection().await; schema::command(connection, config).await? } + Command::ParquetToLp(config) => parquet_to_lp::command(config).await?, } Ok(()) diff --git a/influxdb_iox/src/commands/debug/parquet_to_lp.rs b/influxdb_iox/src/commands/debug/parquet_to_lp.rs new file mode 100644 index 0000000000..8ed03311fd --- /dev/null +++ b/influxdb_iox/src/commands/debug/parquet_to_lp.rs @@ -0,0 +1,73 @@ +//! This module implements the `parquet_to_lp` CLI command +use std::{io::BufWriter, path::PathBuf}; + +use observability_deps::tracing::info; +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Cannot {} output file '{:?}': {}", operation, path, source))] + File { + operation: String, + path: PathBuf, + source: std::io::Error, + }, + #[snafu(display("Error converting: {}", source))] + Conversion { + source: parquet_to_line_protocol::Error, + }, + #[snafu(display("Cannot flush output: {}", message))] + Flush { + // flush error has the W writer in it, all we care about is the error + message: String, + }, +} + +/// Convert IOx Parquet files into InfluxDB line protocol format +#[derive(Debug, clap::Parser)] +pub struct Config { + /// Input file name + #[clap(value_parser)] + input: PathBuf, + + #[clap(long, short)] + /// The path to which to write. If not specified writes to stdout + output: Option, +} + +pub async fn command(config: Config) -> Result<(), Error> { + let Config { input, output } = config; + info!(?input, ?output, "Exporting parquet as line protocol"); + + if let Some(output) = output { + let path = &output; + let file = std::fs::File::create(path).context(FileSnafu { + operation: "open", + path, + })?; + + let file = convert(input, file).await?; + + file.sync_all().context(FileSnafu { + operation: "close", + path, + })?; + } else { + convert(input, std::io::stdout()).await?; + } + + Ok(()) +} + +/// Does the actual conversion, returning the writer when done +async fn convert(input: PathBuf, writer: W) -> Result { + // use a buffered writer and ensure it is flushed + parquet_to_line_protocol::convert_file(input, BufWriter::new(writer)) + .await + .context(ConversionSnafu)? + // flush the buffered writer + .into_inner() + .map_err(|e| Error::Flush { + message: e.to_string(), + }) +} diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index daee4eddf6..b45c956178 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -113,14 +113,7 @@ async fn remote_partition_and_get_from_store_and_pull() { .stdout .clone(); - let v: Value = serde_json::from_slice(&out).unwrap(); - let id = v.as_array().unwrap()[0] - .as_object() - .unwrap() - .get("objectStoreId") - .unwrap() - .as_str() - .unwrap(); + let object_store_id = get_object_store_id(&out); let dir = tempdir().unwrap(); let f = dir.path().join("tmp.parquet"); @@ -133,7 +126,7 @@ async fn remote_partition_and_get_from_store_and_pull() { .arg("remote") .arg("store") .arg("get") - .arg(id) + .arg(&object_store_id) .arg(filename) .assert() .success() @@ -185,7 +178,7 @@ async fn remote_partition_and_get_from_store_and_pull() { .success() .stdout( predicate::str::contains("wrote file") - .and(predicate::str::contains(id)), + .and(predicate::str::contains(&object_store_id)), ); } .boxed() @@ -196,6 +189,132 @@ async fn remote_partition_and_get_from_store_and_pull() { .await } +#[tokio::test] +async fn parquet_to_lp() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + // The test below assumes a specific partition id, so use a + // non-shared one here so concurrent tests don't interfere with + // each other + let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; + + let line_protocol = "my_awesome_table,tag1=A,tag2=B val=42i 123456"; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(String::from(line_protocol)), + // wait for partitions to be persisted + Step::WaitForPersisted, + // Run the 'remote partition' command + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let router_addr = state.cluster().router().router_grpc_base().to_string(); + + // Validate the output of the remote partition CLI command + // + // Looks like: + // { + // "id": "1", + // "shardId": 1, + // "namespaceId": 1, + // "tableId": 1, + // "partitionId": "1", + // "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd", + // "minTime": "123456", + // "maxTime": "123456", + // "fileSizeBytes": "2029", + // "rowCount": "1", + // "compactionLevel": "1", + // "createdAt": "1650019674289347000" + // } + + let out = Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&router_addr) + .arg("remote") + .arg("partition") + .arg("show") + .arg("1") + .assert() + .success() + .get_output() + .stdout + .clone(); + + let object_store_id = get_object_store_id(&out); + let dir = tempdir().unwrap(); + let f = dir.path().join("tmp.parquet"); + let filename = f.as_os_str().to_str().unwrap(); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&router_addr) + .arg("remote") + .arg("store") + .arg("get") + .arg(&object_store_id) + .arg(filename) + .assert() + .success() + .stdout( + predicate::str::contains("wrote") + .and(predicate::str::contains(filename)), + ); + + // convert to line protocol (stdout) + let output = Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("debug") + .arg("parquet-to-lp") + .arg(filename) + .assert() + .success() + .stdout(predicate::str::contains(line_protocol)) + .get_output() + .stdout + .clone(); + + println!("Got output {:?}", output); + + // test writing to output file as well + // Ensure files are actually wrote to the filesystem + let output_file = + tempfile::NamedTempFile::new().expect("Error making temp file"); + println!("Writing to {:?}", output_file); + + // convert to line protocol (to a file) + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("debug") + .arg("parquet-to-lp") + .arg(filename) + .arg("--output") + .arg(output_file.path()) + .assert() + .success(); + + let file_contents = + std::fs::read(output_file.path()).expect("can not read data from tempfile"); + let file_contents = String::from_utf8_lossy(&file_contents); + assert!( + predicate::str::contains(line_protocol).eval(&file_contents), + "Could not file {} in {}", + line_protocol, + file_contents + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + /// Write, compact, then use the remote partition command #[tokio::test] async fn compact_and_get_remote_partition() { @@ -261,15 +380,7 @@ async fn compact_and_get_remote_partition() { .stdout .clone(); - let v: Value = serde_json::from_slice(&out).unwrap(); - let id = v.as_array().unwrap()[0] - .as_object() - .unwrap() - .get("objectStoreId") - .unwrap() - .as_str() - .unwrap(); - + let object_store_id = get_object_store_id(&out); let dir = tempdir().unwrap(); let f = dir.path().join("tmp.parquet"); let filename = f.as_os_str().to_str().unwrap(); @@ -281,7 +392,7 @@ async fn compact_and_get_remote_partition() { .arg("remote") .arg("store") .arg("get") - .arg(id) + .arg(&object_store_id) .arg(filename) .assert() .success() @@ -333,7 +444,7 @@ async fn compact_and_get_remote_partition() { .success() .stdout( predicate::str::contains("wrote file") - .and(predicate::str::contains(id)), + .and(predicate::str::contains(object_store_id)), ); } .boxed() @@ -540,3 +651,28 @@ async fn query_ingester() { .run() .await } + +/// extracts the parquet filename from JSON that looks like +/// ```text +/// { +/// "id": "1", +/// ... +// "objectStoreId": "fa6cdcd1-cbc2-4fb7-8b51-4773079124dd", +/// ... +/// } +/// ``` +fn get_object_store_id(output: &[u8]) -> String { + let v: Value = serde_json::from_slice(output).unwrap(); + // We only process the first value, so panic if it isn't there + let arr = v.as_array().unwrap(); + assert_eq!(arr.len(), 1); + let id = arr[0] + .as_object() + .unwrap() + .get("objectStoreId") + .unwrap() + .as_str() + .unwrap(); + + id.to_string() +} diff --git a/influxdb_line_protocol/src/builder.rs b/influxdb_line_protocol/src/builder.rs index 77bba9418d..102af7a66a 100644 --- a/influxdb_line_protocol/src/builder.rs +++ b/influxdb_line_protocol/src/builder.rs @@ -329,7 +329,7 @@ impl FieldValue for u64 { #[cfg(test)] mod tests { - use crate::{parse_lines, ParsedLine}; + use crate::{parse_lines, FieldSet, ParsedLine}; use super::*; @@ -507,4 +507,31 @@ mod tests { assert_eq!(get_timestamp(11), None); assert_eq!(get_timestamp(12), Some(1234)); } + + #[test] + fn test_float_formatting() { + // ensure that my_float is printed in a way that it is parsed + // as a float (not an int) + let builder = LineProtocolBuilder::new() + .measurement("tag_keys") + .tag("foo", "bar") + .field("my_float", 3.0) + .close_line(); + + let lp = String::from_utf8(builder.build()).unwrap(); + println!("-----\n{lp}-----"); + + let parsed_lines = parse_lines(&lp) + .collect::>, _>>() + .unwrap(); + + assert_eq!(parsed_lines.len(), 1); + let parsed_line = &parsed_lines[0]; + + let expected_fields = vec![("my_float".into(), crate::FieldValue::F64(3.0))] + .into_iter() + .collect::>(); + + assert_eq!(parsed_line.field_set, expected_fields) + } } diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index b2f4ed10d3..e72acd333c 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -297,6 +297,20 @@ pub struct IoxMetadata { } impl IoxMetadata { + /// Convert to base64 encoded protobuf format + pub fn to_base64(&self) -> std::result::Result { + Ok(base64::encode(&self.to_protobuf()?)) + } + + /// Read from base64 encoded protobuf format + pub fn from_base64(proto_base64: &[u8]) -> Result { + let proto_bytes = base64::decode(proto_base64) + .map_err(|err| Box::new(err) as _) + .context(IoxMetadataBrokenSnafu)?; + + Self::from_protobuf(&proto_bytes) + } + /// Convert to protobuf v3 message. pub(crate) fn to_protobuf(&self) -> std::result::Result, prost::EncodeError> { let sort_key = self.sort_key.as_ref().map(|key| proto::SortKey { @@ -718,12 +732,8 @@ impl DecodedIoxParquetMetaData { // extract protobuf message from key-value entry let proto_base64 = kv.value.as_ref().context(IoxMetadataMissingSnafu)?; - let proto_bytes = base64::decode(proto_base64) - .map_err(|err| Box::new(err) as _) - .context(IoxMetadataBrokenSnafu)?; - - // convert to Rust object - IoxMetadata::from_protobuf(proto_bytes.as_slice()) + // read to rust object + IoxMetadata::from_base64(proto_base64.as_bytes()) } /// Read IOx schema from parquet metadata. diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 2a452cacb7..ad1b667241 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -174,12 +174,10 @@ where /// serialising the given [`IoxMetadata`] and embedding it as a key=value /// property keyed by [`METADATA_KEY`]. fn writer_props(meta: &IoxMetadata) -> Result { - let bytes = meta.to_protobuf()?; - let builder = WriterProperties::builder() .set_key_value_metadata(Some(vec![KeyValue { key: METADATA_KEY.to_string(), - value: Some(base64::encode(&bytes)), + value: Some(meta.to_base64()?), }])) .set_compression(Compression::ZSTD) .set_max_row_group_size(ROW_GROUP_WRITE_SIZE); diff --git a/parquet_to_line_protocol/Cargo.toml b/parquet_to_line_protocol/Cargo.toml new file mode 100644 index 0000000000..9b4cc08004 --- /dev/null +++ b/parquet_to_line_protocol/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "parquet_to_line_protocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +datafusion = { path = "../datafusion" } +influxdb_line_protocol = { path = "../influxdb_line_protocol" } +futures = {version = "0.3"} +num_cpus = "1.13.1" +object_store = { version = "0.5.0" } +parquet_file = { path = "../parquet_file" } +schema = { path = "../schema" } +tokio = "1.0" +snafu = "0.7" +workspace-hack = { path = "../workspace-hack"} + + +[dev-dependencies] +mutable_batch = { path = "../mutable_batch" } +mutable_batch_lp = { path = "../mutable_batch_lp" } diff --git a/parquet_to_line_protocol/src/batch.rs b/parquet_to_line_protocol/src/batch.rs new file mode 100644 index 0000000000..faef8a7c60 --- /dev/null +++ b/parquet_to_line_protocol/src/batch.rs @@ -0,0 +1,250 @@ +use datafusion::arrow::{ + array::{ + as_boolean_array, as_dictionary_array, as_primitive_array, as_string_array, Array, + ArrayAccessor, StringArray, + }, + datatypes::{Float64Type, Int32Type, Int64Type, TimestampNanosecondType, UInt64Type}, + record_batch::RecordBatch, +}; +use influxdb_line_protocol::{builder::FieldValue, FieldValue as LPFieldValue}; +use schema::{InfluxColumnType, InfluxFieldType, Schema}; + +/// Converts a [`RecordBatch`] into line protocol lines. +pub(crate) fn convert_to_lines( + measurement_name: &str, + iox_schema: &Schema, + batch: &RecordBatch, +) -> Result, String> { + let mut lp_builder = influxdb_line_protocol::LineProtocolBuilder::new(); + + for index in 0..batch.num_rows() { + let lp_tags = lp_builder.measurement(measurement_name); + + // Add all tags + let lp_tags = tags_values_iter(iox_schema, index, batch) + .into_iter() + .fold(lp_tags, |lp_tags, tag_column| { + lp_tags.tag(tag_column.name, tag_column.value) + }); + + // add fields + let mut fields = field_values_iter(iox_schema, index, batch).into_iter(); + + // need at least one field (to put builder into "AfterTag" mode + let first_field = fields + .next() + .ok_or_else(|| format!("Need at least one field, schema had none: {:?}", iox_schema))?; + + let lp_fields = lp_tags.field(first_field.name, first_field); + + // add rest of fileds + let lp_fields = fields.fold(lp_fields, |lp_fields, field| { + lp_fields.field(field.name, field) + }); + + let ts = timestamp_value(iox_schema, index, batch)?; + lp_builder = lp_fields.timestamp(ts).close_line(); + } + + Ok(lp_builder.build()) +} + +/// Return an iterator over all non null tags in a batch +fn tags_values_iter<'a>( + iox_schema: &'a Schema, + row_index: usize, + batch: &'a RecordBatch, +) -> impl IntoIterator> { + iox_schema + .iter() + .enumerate() + .filter_map(move |(column_index, (influx_column_type, field))| { + if matches!(influx_column_type, Some(InfluxColumnType::Tag)) { + // tags are always dictionaries + let arr = as_dictionary_array::(batch.column(column_index)) + .downcast_dict::() + .expect("Tag was not a string dictionary array"); + + // If the value of this column is not null, return it. + if arr.is_valid(row_index) { + return Some(TagColumn { + name: field.name(), + value: arr.value(row_index), + }); + } + } + None + }) +} + +/// Represents a particular column along with code that knows how to add that value to a builder. +struct TagColumn<'a> { + name: &'a str, + value: &'a str, +} + +/// Return an iterator over all non null fields in a batch +fn field_values_iter<'a>( + iox_schema: &'a Schema, + row_index: usize, + batch: &'a RecordBatch, +) -> impl IntoIterator> { + iox_schema + .iter() + .enumerate() + .filter_map(move |(column_index, (influx_column_type, field))| { + // Skip any value that is NULL + let arr = batch.column(column_index); + if !arr.is_valid(row_index) { + return None; + } + + let name = field.name(); + + // Extract the value from the relevant array and convert it + let value = match influx_column_type { + Some(InfluxColumnType::Field(InfluxFieldType::Float)) => { + LPFieldValue::F64(as_primitive_array::(arr).value(row_index)) + } + Some(InfluxColumnType::Field(InfluxFieldType::Integer)) => { + LPFieldValue::I64(as_primitive_array::(arr).value(row_index)) + } + Some(InfluxColumnType::Field(InfluxFieldType::UInteger)) => { + LPFieldValue::U64(as_primitive_array::(arr).value(row_index)) + } + Some(InfluxColumnType::Field(InfluxFieldType::String)) => { + LPFieldValue::String(as_string_array(arr).value(row_index).into()) + } + Some(InfluxColumnType::Field(InfluxFieldType::Boolean)) => { + LPFieldValue::Boolean(as_boolean_array(arr).value(row_index)) + } + // not a field + Some(InfluxColumnType::Tag) | Some(InfluxColumnType::Timestamp) | None => { + return None + } + }; + + Some(FieldColumn { name, value }) + }) +} + +/// Represents a particular Field column's value in a way that knows how to format +struct FieldColumn<'a> { + name: &'a str, + value: LPFieldValue<'a>, +} + +impl<'a> FieldValue for FieldColumn<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.value { + LPFieldValue::I64(v) => v.fmt(f), + LPFieldValue::U64(v) => v.fmt(f), + LPFieldValue::F64(v) => v.fmt(f), + LPFieldValue::String(v) => v.as_str().fmt(f), + LPFieldValue::Boolean(v) => v.fmt(f), + } + } +} + +/// Find the timestamp value for the specified row +fn timestamp_value<'a>( + iox_schema: &'a Schema, + row_index: usize, + batch: &'a RecordBatch, +) -> Result { + let column_index = iox_schema + .iter() + .enumerate() + .filter_map(move |(column_index, (influx_column_type, _))| { + if matches!(influx_column_type, Some(InfluxColumnType::Timestamp)) { + Some(column_index) + } else { + None + } + }) + .next() + .ok_or_else(|| "No timestamp column found in schema".to_string())?; + + // timestamps are always TimestampNanosecondArray's and should always have a timestamp value filled in + let arr = as_primitive_array::(batch.column(column_index)); + + if !arr.is_valid(row_index) { + Err(format!( + "TimestampValue was unexpectedly null at row {}", + row_index + )) + } else { + Ok(arr.value(row_index)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mutable_batch_lp::lines_to_batches; + use schema::selection::Selection; + + #[test] + fn basic() { + round_trip("my_measurement_name,tag=foo value=4 1000"); + } + + #[test] + fn no_tags() { + round_trip("my_no_tag_measurement_name value=4 1000"); + } + + #[test] + #[should_panic = "Error parsing line protocol: LineProtocol { source: FieldSetMissing, line: 1 }"] + fn no_fields() { + round_trip("my_no_tag_measurement_name,tag=4 1000"); + } + + #[test] + fn all_types() { + // Note we use cannonical format (e.g. 'true' instead of 't') + round_trip( + r#"m,tag=row1 float_field=64 450 +m,tag2=row2 float_field=65 550 +m,tag2=row3 int_field=65i 560 +m,tag2=row4 uint_field=5u 580 +m,tag2=row5 bool_field=true 590 +m,tag2=row6 str_field="blargh" 600 +m,tag2=multi_field bool_field=false,str_field="blargh" 610 +"#, + ); + } + + /// ensures that parsing line protocol to record batches and then + /// converting it back to line protocol results in the same output + /// + /// Note it must use cannonical format (e.g. 'true' instead of 't') + fn round_trip(lp: &str) { + let default_time = 0; + let mutable_batches = + lines_to_batches(lp, default_time).expect("Error parsing line protocol"); + assert_eq!( + mutable_batches.len(), + 1, + "round trip only supports one measurement" + ); + let (table_name, mutable_batch) = mutable_batches.into_iter().next().unwrap(); + + let selection = Selection::All; + let record_batch = mutable_batch.to_arrow(selection).unwrap(); + let iox_schema = mutable_batch.schema(selection).unwrap(); + + let output_lp = convert_to_lines(&table_name, &iox_schema, &record_batch) + .expect("error converting lines"); + let output_lp = String::from_utf8_lossy(&output_lp); + + let lp = lp.trim(); + let output_lp = output_lp.trim(); + + assert_eq!( + lp, output_lp, + "\n\nInput:\n\n{}\n\nOutput:\n\n{}\n", + lp, output_lp + ) + } +} diff --git a/parquet_to_line_protocol/src/lib.rs b/parquet_to_line_protocol/src/lib.rs new file mode 100644 index 0000000000..5ba867d831 --- /dev/null +++ b/parquet_to_line_protocol/src/lib.rs @@ -0,0 +1,237 @@ +//! Code that can convert between parquet files and line protocol + +use std::{ + io::Write, + path::{Path, PathBuf}, + result::Result, + sync::Arc, +}; + +use datafusion::{ + arrow::datatypes::SchemaRef as ArrowSchemaRef, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + listing::PartitionedFile, + object_store::ObjectStoreUrl, + }, + execution::context::TaskContext, + physical_plan::{ + execute_stream, + file_format::{FileScanConfig, ParquetExec}, + SendableRecordBatchStream, Statistics, + }, + prelude::{SessionConfig, SessionContext}, +}; +use futures::StreamExt; +use object_store::{ + local::LocalFileSystem, path::Path as ObjectStorePath, ObjectMeta, ObjectStore, +}; +use parquet_file::metadata::{IoxMetadata, METADATA_KEY}; +use schema::Schema; + +use snafu::{OptionExt, ResultExt, Snafu}; + +mod batch; + +use batch::convert_to_lines; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Invalid path: {:?}: {}", path, source))] + Path { + path: PathBuf, + source: object_store::path::Error, + }, + + #[snafu(display("Error listing: {:?}: {}", object_store_path, source))] + ObjectStorePath { + object_store_path: ObjectStorePath, + source: object_store::Error, + }, + + #[snafu(display( + "Can not find IOx metadata in parquet metadata. Could not find {}", + METADATA_KEY + ))] + MissingMetadata {}, + + #[snafu(display("Error reading IOx metadata: {}", source))] + Metadata { + source: parquet_file::metadata::Error, + }, + + #[snafu(display("Error inferring IOx schema: {}", source))] + InferringSchema { + source: datafusion::error::DataFusionError, + }, + + #[snafu(display("Error reading IOx schema: {}", source))] + Schema { source: schema::Error }, + + #[snafu(display("Error in processing task: {}", source))] + Task { source: tokio::task::JoinError }, + + #[snafu(display("Error converting: {}", message))] + Conversion { message: String }, + + #[snafu(display("Error executing: {}", source))] + ExecutingStream { + source: datafusion::error::DataFusionError, + }, + + #[snafu(display("IO Error: {}", source))] + IO { source: std::io::Error }, +} + +/// Converts a parquet file that was written by IOx from the local +/// file system path specified to line protocol and writes those bytes +/// to `output`, returning the writer on success +pub async fn convert_file(path: P, mut output: W) -> Result +where + P: AsRef, + W: Write, +{ + let path = path.as_ref(); + let object_store_path = + ObjectStorePath::from_filesystem_path(path).context(PathSnafu { path })?; + + // Fire up a parquet reader, read the batches, and then convert + // them asynchronously in parallel + + let object_store = Arc::new(LocalFileSystem::new()) as Arc; + let object_store_url = ObjectStoreUrl::local_filesystem(); + + let object_meta = object_store + .head(&object_store_path) + .await + .context(ObjectStorePathSnafu { object_store_path })?; + + let reader = ParquetFileReader::try_new(object_store, object_store_url, object_meta).await?; + + // Determines the measurement name from the IOx metadata + let schema = reader.schema(); + let encoded_meta = schema + .metadata + .get(METADATA_KEY) + .context(MissingMetadataSnafu)?; + + let iox_meta = IoxMetadata::from_base64(encoded_meta.as_bytes()).context(MetadataSnafu)?; + + // Attempt to extract the IOx schema from the schema stored in the + // parquet file. This schema is where information such as what + // columns are tags and fields is stored + let iox_schema: Schema = schema.try_into().context(SchemaSnafu)?; + + let iox_schema = Arc::new(iox_schema); + + let measurement_name = iox_meta.table_name; + + // now convert the record batches to line protocol, in parallel + let mut lp_stream = reader + .read() + .await? + .map(|batch| { + let iox_schema = Arc::clone(&iox_schema); + let measurement_name = Arc::clone(&measurement_name); + tokio::task::spawn(async move { + batch + .map_err(|e| format!("Something bad happened reading batch: {}", e)) + .and_then(|batch| convert_to_lines(&measurement_name, &iox_schema, &batch)) + }) + }) + // run some number of futures in parallel + .buffered(num_cpus::get()); + + // but print them to the output stream in the same order + while let Some(data) = lp_stream.next().await { + let data = data + .context(TaskSnafu)? + .map_err(|message| Error::Conversion { message })?; + + output.write_all(&data).context(IOSnafu)?; + } + Ok(output) +} + +/// Handles the details of interacting with parquet libraries / +/// readers. Tries not to have any IOx specific logic +struct ParquetFileReader { + object_store: Arc, + object_store_url: ObjectStoreUrl, + /// Name / path information of the object to read + object_meta: ObjectMeta, + + /// Parquet file metadata + schema: ArrowSchemaRef, + + /// number of rows to read in each batch (can pick small to + /// increase parallelism). Defaults to 1000 + batch_size: usize, +} + +impl ParquetFileReader { + /// Find and open the specified parquet file, and read its metadata / schema + async fn try_new( + object_store: Arc, + object_store_url: ObjectStoreUrl, + object_meta: ObjectMeta, + ) -> Result { + // Keep metadata so we can find the measurement name + let format = ParquetFormat::default().with_skip_metadata(false); + + // Use datafusion parquet reader to read the metadata from the + // file. + let schema = format + .infer_schema(&object_store, &[object_meta.clone()]) + .await + .context(InferringSchemaSnafu)?; + + Ok(Self { + object_store, + object_store_url, + object_meta, + schema, + batch_size: 1000, + }) + } + + // retrieves the Arrow schema for this file + fn schema(&self) -> ArrowSchemaRef { + Arc::clone(&self.schema) + } + + /// read the parquet file as a stream + async fn read(&self) -> Result { + let base_config = FileScanConfig { + object_store_url: self.object_store_url.clone(), + file_schema: self.schema(), + file_groups: vec![vec![PartitionedFile { + object_meta: self.object_meta.clone(), + partition_values: vec![], + range: None, + extensions: None, + }]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }; + + // set up enough datafusion context to do the real read session + let predicate = None; + let metadata_size_hint = None; + let exec = ParquetExec::new(base_config, predicate, metadata_size_hint); + let session_config = SessionConfig::new().with_batch_size(self.batch_size); + let session_ctx = SessionContext::with_config(session_config); + + let object_store = Arc::clone(&self.object_store); + let task_ctx = Arc::new(TaskContext::from(&session_ctx)); + task_ctx + .runtime_env() + .register_object_store("iox", "iox", object_store); + + execute_stream(Arc::new(exec), task_ctx) + .await + .context(ExecutingStreamSnafu) + } +}