feat: remove path parsing functionality
Paths to parquet files are an implementation detail and should not be parsed. Closes #1506.pull/24376/head
parent
250ccdcdcd
commit
776b6c011c
|
@ -180,7 +180,8 @@ impl Storage {
|
||||||
/// Return full path including filename in the object store to save a chunk
|
/// Return full path including filename in the object store to save a chunk
|
||||||
/// table file.
|
/// table file.
|
||||||
///
|
///
|
||||||
/// See [`parse_location`](Self::parse_location) for parsing.
|
/// **Important: The resulting path should be treated as a black box. It might vary over time and is an
|
||||||
|
/// implementation detail. Do NOT attempt to parse it.**
|
||||||
pub fn location(
|
pub fn location(
|
||||||
&self,
|
&self,
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
|
@ -200,37 +201,6 @@ impl Storage {
|
||||||
path
|
path
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse locations and return partition key, chunk ID and table name.
|
|
||||||
///
|
|
||||||
/// See [`location`](Self::location) for path generation.
|
|
||||||
pub fn parse_location(
|
|
||||||
&self,
|
|
||||||
path: impl Into<DirsAndFileName>,
|
|
||||||
) -> Result<(String, u32, String)> {
|
|
||||||
let path: DirsAndFileName = path.into();
|
|
||||||
|
|
||||||
let dirs: Vec<_> = path.directories.iter().map(|part| part.encoded()).collect();
|
|
||||||
match (dirs.as_slice(), &path.file_name) {
|
|
||||||
([server_id, db_name, "data", partition_key, chunk_id], Some(filename))
|
|
||||||
if (server_id == &self.server_id.to_string()) && (db_name == &self.db_name) =>
|
|
||||||
{
|
|
||||||
let chunk_id: u32 = match chunk_id.parse() {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(_) => return Err(Error::LocationParsingFailure { path }),
|
|
||||||
};
|
|
||||||
|
|
||||||
let parts: Vec<_> = filename.encoded().split('.').collect();
|
|
||||||
let table_name = match parts[..] {
|
|
||||||
[name, "parquet"] => name,
|
|
||||||
_ => return Err(Error::LocationParsingFailure { path }),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((partition_key.to_string(), chunk_id, table_name.to_string()))
|
|
||||||
}
|
|
||||||
_ => Err(Error::LocationParsingFailure { path }),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write the given stream of data of a specified table of
|
/// Write the given stream of data of a specified table of
|
||||||
/// a specified partitioned chunk to a parquet file of this storage
|
/// a specified partitioned chunk to a parquet file of this storage
|
||||||
pub async fn write_to_object_store(
|
pub async fn write_to_object_store(
|
||||||
|
@ -517,7 +487,6 @@ mod tests {
|
||||||
use arrow_util::assert_batches_eq;
|
use arrow_util::assert_batches_eq;
|
||||||
use datafusion::physical_plan::common::SizedRecordBatchStream;
|
use datafusion::physical_plan::common::SizedRecordBatchStream;
|
||||||
use datafusion_util::MemoryStream;
|
use datafusion_util::MemoryStream;
|
||||||
use object_store::parsed_path;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -549,65 +518,6 @@ mod tests {
|
||||||
assert_eq!(metadata_roundtrip, metadata);
|
assert_eq!(metadata_roundtrip, metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_location_to_from_path() {
|
|
||||||
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
|
|
||||||
let store = Storage::new(make_object_store(), server_id, "my_db");
|
|
||||||
|
|
||||||
// happy roundtrip
|
|
||||||
let path = store.location("p1".to_string(), 42, "my_table".to_string());
|
|
||||||
assert_eq!(path.display(), "1/my_db/data/p1/42/my_table.parquet");
|
|
||||||
assert_eq!(
|
|
||||||
store.parse_location(path).unwrap(),
|
|
||||||
("p1".to_string(), 42, "my_table".to_string())
|
|
||||||
);
|
|
||||||
|
|
||||||
// error cases
|
|
||||||
assert!(store.parse_location(parsed_path!()).is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(["too", "short"], "my_table.parquet"))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(
|
|
||||||
["this", "is", "way", "way", "too", "long"],
|
|
||||||
"my_table.parquet"
|
|
||||||
))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(
|
|
||||||
["1", "my_db", "data", "p1", "not_a_number"],
|
|
||||||
"my_table.parquet"
|
|
||||||
))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(
|
|
||||||
["1", "my_db", "not_data", "p1", "42"],
|
|
||||||
"my_table.parquet"
|
|
||||||
))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(
|
|
||||||
["1", "other_db", "data", "p1", "42"],
|
|
||||||
"my_table.parquet"
|
|
||||||
))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(
|
|
||||||
["2", "my_db", "data", "p1", "42"],
|
|
||||||
"my_table.parquet"
|
|
||||||
))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(["1", "my_db", "data", "p1", "42"], "my_table"))
|
|
||||||
.is_err());
|
|
||||||
assert!(store
|
|
||||||
.parse_location(parsed_path!(
|
|
||||||
["1", "my_db", "data", "p1", "42"],
|
|
||||||
"my_table.parquet.tmp"
|
|
||||||
))
|
|
||||||
.is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_roundtrip() {
|
async fn test_roundtrip() {
|
||||||
test_helpers::maybe_start_logging();
|
test_helpers::maybe_start_logging();
|
||||||
|
|
Loading…
Reference in New Issue