refactor: use Arc<str> instead of Arc<String> (#1442)
parent
49c0b8b90c
commit
44de42906f
|
@ -41,10 +41,10 @@ impl ChunkStorage {
|
|||
/// A chunk can contain one or more tables.
|
||||
pub struct ChunkSummary {
|
||||
/// The partition key of this chunk
|
||||
pub partition_key: Arc<String>,
|
||||
pub partition_key: Arc<str>,
|
||||
|
||||
/// The table of this chunk
|
||||
pub table_name: Arc<String>,
|
||||
pub table_name: Arc<str>,
|
||||
|
||||
/// The id of this chunk
|
||||
pub id: u32,
|
||||
|
@ -75,8 +75,8 @@ pub struct ChunkSummary {
|
|||
impl ChunkSummary {
|
||||
/// Construct a ChunkSummary that has None for all timestamps
|
||||
pub fn new_without_timestamps(
|
||||
partition_key: Arc<String>,
|
||||
table_name: Arc<String>,
|
||||
partition_key: Arc<str>,
|
||||
table_name: Arc<str>,
|
||||
id: u32,
|
||||
storage: ChunkStorage,
|
||||
estimated_bytes: usize,
|
||||
|
|
|
@ -26,7 +26,7 @@ pub trait AsExpr {
|
|||
}
|
||||
}
|
||||
|
||||
impl AsExpr for Arc<String> {
|
||||
impl AsExpr for Arc<str> {
|
||||
fn as_expr(&self) -> Expr {
|
||||
col(self.as_ref())
|
||||
}
|
||||
|
|
|
@ -25,18 +25,8 @@ impl From<ChunkSummary> for management::Chunk {
|
|||
let estimated_bytes = estimated_bytes as u64;
|
||||
let row_count = row_count as u64;
|
||||
|
||||
let partition_key = match Arc::try_unwrap(partition_key) {
|
||||
// no one else has a reference so take the string
|
||||
Ok(partition_key) => partition_key,
|
||||
// some other reference exists to this string, so clone it
|
||||
Err(partition_key) => partition_key.as_ref().clone(),
|
||||
};
|
||||
let table_name = match Arc::try_unwrap(table_name) {
|
||||
// no one else has a reference so take the string
|
||||
Ok(table_name) => table_name,
|
||||
// some other reference exists to this string, so clone it
|
||||
Err(table_name) => table_name.as_ref().clone(),
|
||||
};
|
||||
let partition_key = partition_key.to_string();
|
||||
let table_name = table_name.to_string();
|
||||
|
||||
let time_of_first_write = time_of_first_write.map(|t| t.into());
|
||||
let time_of_last_write = time_of_last_write.map(|t| t.into());
|
||||
|
@ -114,8 +104,8 @@ impl TryFrom<management::Chunk> for ChunkSummary {
|
|||
|
||||
let estimated_bytes = estimated_bytes as usize;
|
||||
let row_count = row_count as usize;
|
||||
let partition_key = Arc::new(partition_key);
|
||||
let table_name = Arc::new(table_name);
|
||||
let partition_key = Arc::from(partition_key.as_str());
|
||||
let table_name = Arc::from(table_name.as_str());
|
||||
|
||||
Ok(Self {
|
||||
partition_key,
|
||||
|
@ -168,8 +158,8 @@ mod test {
|
|||
|
||||
let summary = ChunkSummary::try_from(proto).expect("conversion successful");
|
||||
let expected = ChunkSummary {
|
||||
partition_key: Arc::new("foo".to_string()),
|
||||
table_name: Arc::new("bar".to_string()),
|
||||
partition_key: Arc::from("foo"),
|
||||
table_name: Arc::from("bar"),
|
||||
id: 42,
|
||||
estimated_bytes: 1234,
|
||||
row_count: 321,
|
||||
|
@ -189,8 +179,8 @@ mod test {
|
|||
#[test]
|
||||
fn valid_summary_to_proto() {
|
||||
let summary = ChunkSummary {
|
||||
partition_key: Arc::new("foo".to_string()),
|
||||
table_name: Arc::new("bar".to_string()),
|
||||
partition_key: Arc::from("foo"),
|
||||
table_name: Arc::from("bar"),
|
||||
id: 42,
|
||||
estimated_bytes: 1234,
|
||||
row_count: 321,
|
||||
|
|
|
@ -19,28 +19,28 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[derive(Debug, PartialEq)]
|
||||
pub enum FieldColumns {
|
||||
/// All field columns share a timestamp column, named TIME_COLUMN_NAME
|
||||
SharedTimestamp(Vec<Arc<String>>),
|
||||
SharedTimestamp(Vec<Arc<str>>),
|
||||
|
||||
/// Each field has a potentially different timestamp column
|
||||
// (value_name, timestamp_name)
|
||||
DifferentTimestamp(Vec<(Arc<String>, Arc<String>)>),
|
||||
DifferentTimestamp(Vec<(Arc<str>, Arc<str>)>),
|
||||
}
|
||||
|
||||
impl From<Vec<Arc<String>>> for FieldColumns {
|
||||
fn from(v: Vec<Arc<String>>) -> Self {
|
||||
impl From<Vec<Arc<str>>> for FieldColumns {
|
||||
fn from(v: Vec<Arc<str>>) -> Self {
|
||||
Self::SharedTimestamp(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<(Arc<String>, Arc<String>)>> for FieldColumns {
|
||||
fn from(v: Vec<(Arc<String>, Arc<String>)>) -> Self {
|
||||
impl From<Vec<(Arc<str>, Arc<str>)>> for FieldColumns {
|
||||
fn from(v: Vec<(Arc<str>, Arc<str>)>) -> Self {
|
||||
Self::DifferentTimestamp(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<&str>> for FieldColumns {
|
||||
fn from(v: Vec<&str>) -> Self {
|
||||
let v = v.into_iter().map(|v| Arc::new(v.to_string())).collect();
|
||||
let v = v.into_iter().map(Arc::from).collect();
|
||||
|
||||
Self::SharedTimestamp(v)
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ impl From<Vec<&str>> for FieldColumns {
|
|||
|
||||
impl From<&[&str]> for FieldColumns {
|
||||
fn from(v: &[&str]) -> Self {
|
||||
let v = v.iter().map(|v| Arc::new(v.to_string())).collect();
|
||||
let v = v.iter().map(|v| Arc::from(*v)).collect();
|
||||
|
||||
Self::SharedTimestamp(v)
|
||||
}
|
||||
|
@ -114,10 +114,7 @@ impl From<Vec<FieldIndex>> for FieldIndexes {
|
|||
|
||||
impl FieldIndexes {
|
||||
// look up which column index correponds to each column name
|
||||
pub fn names_to_indexes(
|
||||
schema: &SchemaRef,
|
||||
column_names: &[Arc<String>],
|
||||
) -> Result<Vec<usize>> {
|
||||
pub fn names_to_indexes(schema: &SchemaRef, column_names: &[Arc<str>]) -> Result<Vec<usize>> {
|
||||
column_names
|
||||
.iter()
|
||||
.map(|column_name| {
|
||||
|
|
|
@ -73,10 +73,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// table name.
|
||||
pub struct SeriesSet {
|
||||
/// The table name this series came from
|
||||
pub table_name: Arc<String>,
|
||||
pub table_name: Arc<str>,
|
||||
|
||||
/// key = value pairs that define this series
|
||||
pub tags: Vec<(Arc<String>, Arc<String>)>,
|
||||
pub tags: Vec<(Arc<str>, Arc<str>)>,
|
||||
|
||||
/// the column index of each "field" of the time series. For
|
||||
/// example, if there are two field indexes then this series set
|
||||
|
@ -103,7 +103,7 @@ pub struct SeriesSet {
|
|||
#[derive(Debug)]
|
||||
pub struct GroupDescription {
|
||||
/// key = value pairs that define the group
|
||||
pub tags: Vec<(Arc<String>, Arc<String>)>,
|
||||
pub tags: Vec<(Arc<str>, Arc<str>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -135,8 +135,8 @@ impl SeriesSetConverter {
|
|||
/// it: record batch iterator that produces data in the desired order
|
||||
pub async fn convert(
|
||||
&mut self,
|
||||
table_name: Arc<String>,
|
||||
tag_columns: Arc<Vec<Arc<String>>>,
|
||||
table_name: Arc<str>,
|
||||
tag_columns: Arc<Vec<Arc<str>>>,
|
||||
field_columns: FieldColumns,
|
||||
num_prefix_tag_group_columns: Option<usize>,
|
||||
mut it: SendableRecordBatchStream,
|
||||
|
@ -299,9 +299,9 @@ impl SeriesSetConverter {
|
|||
fn get_tag_keys(
|
||||
batch: &RecordBatch,
|
||||
row: usize,
|
||||
tag_column_names: &[Arc<String>],
|
||||
tag_column_names: &[Arc<str>],
|
||||
tag_indexes: &[usize],
|
||||
) -> Vec<(Arc<String>, Arc<String>)> {
|
||||
) -> Vec<(Arc<str>, Arc<str>)> {
|
||||
assert_eq!(tag_column_names.len(), tag_indexes.len());
|
||||
|
||||
tag_column_names
|
||||
|
@ -349,7 +349,7 @@ impl SeriesSetConverter {
|
|||
),
|
||||
};
|
||||
if let Some(tag_value) = tag_value {
|
||||
Some((Arc::clone(&column_name), Arc::new(tag_value)))
|
||||
Some((Arc::clone(&column_name), Arc::from(tag_value.as_str())))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
@ -363,7 +363,7 @@ struct GroupGenerator {
|
|||
num_prefix_tag_group_columns: Option<usize>,
|
||||
|
||||
// vec of num_prefix_tag_group_columns, if any
|
||||
last_group_tags: Option<Vec<(Arc<String>, Arc<String>)>>,
|
||||
last_group_tags: Option<Vec<(Arc<str>, Arc<str>)>>,
|
||||
}
|
||||
|
||||
impl GroupGenerator {
|
||||
|
@ -455,7 +455,7 @@ mod tests {
|
|||
assert_eq!(results.len(), 1);
|
||||
let series_set = &results[0];
|
||||
|
||||
assert_eq!(*series_set.table_name, "foo");
|
||||
assert_eq!(series_set.table_name.as_ref(), "foo");
|
||||
assert!(series_set.tags.is_empty());
|
||||
assert_eq!(
|
||||
series_set.field_indexes,
|
||||
|
@ -509,7 +509,7 @@ mod tests {
|
|||
assert_eq!(results.len(), 1);
|
||||
let series_set = &results[0];
|
||||
|
||||
assert_eq!(*series_set.table_name, "foo");
|
||||
assert_eq!(series_set.table_name.as_ref(), "foo");
|
||||
assert!(series_set.tags.is_empty());
|
||||
assert_eq!(
|
||||
series_set.field_indexes,
|
||||
|
@ -563,7 +563,7 @@ mod tests {
|
|||
assert_eq!(results.len(), 1);
|
||||
let series_set = &results[0];
|
||||
|
||||
assert_eq!(*series_set.table_name, "bar");
|
||||
assert_eq!(series_set.table_name.as_ref(), "bar");
|
||||
assert_eq!(series_set.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
|
||||
assert_eq!(
|
||||
series_set.field_indexes,
|
||||
|
@ -600,7 +600,7 @@ mod tests {
|
|||
assert_eq!(results.len(), 2);
|
||||
let series_set1 = &results[0];
|
||||
|
||||
assert_eq!(*series_set1.table_name, "foo");
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(series_set1.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
|
||||
assert_eq!(
|
||||
series_set1.field_indexes,
|
||||
|
@ -611,7 +611,7 @@ mod tests {
|
|||
|
||||
let series_set2 = &results[1];
|
||||
|
||||
assert_eq!(*series_set2.table_name, "foo");
|
||||
assert_eq!(series_set2.table_name.as_ref(), "foo");
|
||||
assert_eq!(series_set2.tags, str_pair_vec_to_vec(&[("tag_a", "two")]));
|
||||
assert_eq!(
|
||||
series_set2.field_indexes,
|
||||
|
@ -649,7 +649,7 @@ mod tests {
|
|||
assert_eq!(results.len(), 3);
|
||||
let series_set1 = &results[0];
|
||||
|
||||
assert_eq!(*series_set1.table_name, "foo");
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set1.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "ten")])
|
||||
|
@ -659,7 +659,7 @@ mod tests {
|
|||
|
||||
let series_set2 = &results[1];
|
||||
|
||||
assert_eq!(*series_set2.table_name, "foo");
|
||||
assert_eq!(series_set2.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set2.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "eleven")])
|
||||
|
@ -669,7 +669,7 @@ mod tests {
|
|||
|
||||
let series_set3 = &results[2];
|
||||
|
||||
assert_eq!(*series_set3.table_name, "foo");
|
||||
assert_eq!(series_set3.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set3.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "two"), ("tag_b", "eleven")])
|
||||
|
@ -706,7 +706,7 @@ mod tests {
|
|||
assert_eq!(results.len(), 2);
|
||||
let series_set1 = &results[0];
|
||||
|
||||
assert_eq!(*series_set1.table_name, "foo");
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set1.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "ten")])
|
||||
|
@ -716,7 +716,7 @@ mod tests {
|
|||
|
||||
let series_set2 = &results[1];
|
||||
|
||||
assert_eq!(*series_set2.table_name, "foo");
|
||||
assert_eq!(series_set2.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set2.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one")]) // note no value for tag_b, only one tag
|
||||
|
@ -771,7 +771,7 @@ mod tests {
|
|||
|
||||
assert_eq!(group_1.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
|
||||
|
||||
assert_eq!(*series_set1.table_name, "foo");
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set1.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "ten")])
|
||||
|
@ -779,7 +779,7 @@ mod tests {
|
|||
assert_eq!(series_set1.start_row, 0);
|
||||
assert_eq!(series_set1.num_rows, 1);
|
||||
|
||||
assert_eq!(*series_set2.table_name, "foo");
|
||||
assert_eq!(series_set2.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set2.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "eleven")])
|
||||
|
@ -789,7 +789,7 @@ mod tests {
|
|||
|
||||
assert_eq!(group_2.tags, str_pair_vec_to_vec(&[("tag_a", "two")]));
|
||||
|
||||
assert_eq!(*series_set3.table_name, "foo");
|
||||
assert_eq!(series_set3.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set3.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "two"), ("tag_b", "eleven")])
|
||||
|
@ -832,7 +832,7 @@ mod tests {
|
|||
|
||||
assert_eq!(group_1.tags, &[]);
|
||||
|
||||
assert_eq!(*series_set1.table_name, "foo");
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set1.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "ten")])
|
||||
|
@ -840,7 +840,7 @@ mod tests {
|
|||
assert_eq!(series_set1.start_row, 0);
|
||||
assert_eq!(series_set1.num_rows, 1);
|
||||
|
||||
assert_eq!(*series_set2.table_name, "foo");
|
||||
assert_eq!(series_set2.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
series_set2.tags,
|
||||
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "eleven")])
|
||||
|
@ -872,7 +872,7 @@ mod tests {
|
|||
) -> Vec<SeriesSet> {
|
||||
let mut converter = SeriesSetConverter::default();
|
||||
|
||||
let table_name = Arc::new(table_name.into());
|
||||
let table_name = Arc::from(table_name);
|
||||
let tag_columns = str_vec_to_arc_vec(tag_columns);
|
||||
let field_columns = FieldColumns::from(field_columns);
|
||||
|
||||
|
@ -899,7 +899,7 @@ mod tests {
|
|||
) -> Vec<SeriesSetItem> {
|
||||
let mut converter = SeriesSetConverter::default();
|
||||
|
||||
let table_name = Arc::new(table_name.into());
|
||||
let table_name = Arc::from(table_name);
|
||||
let tag_columns = str_vec_to_arc_vec(tag_columns);
|
||||
let field_columns = FieldColumns::from(field_columns);
|
||||
|
||||
|
|
|
@ -797,7 +797,7 @@ impl InfluxRpcPlanner {
|
|||
/// Scan
|
||||
fn read_filter_plan<C>(
|
||||
&self,
|
||||
table_name: impl Into<String>,
|
||||
table_name: impl AsRef<str>,
|
||||
prefix_columns: Option<&[impl AsRef<str>]>,
|
||||
predicate: &Predicate,
|
||||
chunks: Vec<Arc<C>>,
|
||||
|
@ -805,8 +805,8 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
C: PartitionChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.into();
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
|
||||
let table_name = table_name.as_ref();
|
||||
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -855,17 +855,17 @@ impl InfluxRpcPlanner {
|
|||
|
||||
let tag_columns = schema
|
||||
.tags_iter()
|
||||
.map(|field| Arc::new(field.name().to_string()))
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
let field_columns = filtered_fields_iter(&schema, predicate)
|
||||
.map(|field| Arc::new(field.name().to_string()))
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
// TODO: remove the use of tag_columns and field_column names
|
||||
// and instead use the schema directly)
|
||||
let ss_plan = SeriesSetPlan::new_from_shared_timestamp(
|
||||
Arc::new(table_name),
|
||||
Arc::from(table_name),
|
||||
plan,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
|
@ -942,9 +942,9 @@ impl InfluxRpcPlanner {
|
|||
// order in the same order)
|
||||
let tag_columns: Vec<_> = schema.tags_iter().map(|f| f.name() as &str).collect();
|
||||
|
||||
let tag_columns: Vec<_> = reorder_prefix(group_columns, tag_columns)?
|
||||
let tag_columns: Vec<Arc<str>> = reorder_prefix(group_columns, tag_columns)?
|
||||
.into_iter()
|
||||
.map(|name| Arc::new(name.to_string()))
|
||||
.map(Arc::from)
|
||||
.collect();
|
||||
|
||||
// Group by all tag columns
|
||||
|
@ -972,7 +972,7 @@ impl InfluxRpcPlanner {
|
|||
// and finally create the plan
|
||||
let plan = plan_builder.build().context(BuildingPlan)?;
|
||||
|
||||
let ss_plan = SeriesSetPlan::new(Arc::new(table_name), plan, tag_columns, field_columns);
|
||||
let ss_plan = SeriesSetPlan::new(Arc::from(table_name), plan, tag_columns, field_columns);
|
||||
|
||||
Ok(Some(ss_plan))
|
||||
}
|
||||
|
@ -1060,18 +1060,18 @@ impl InfluxRpcPlanner {
|
|||
|
||||
let tag_columns = schema
|
||||
.tags_iter()
|
||||
.map(|field| Arc::new(field.name().to_string()))
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
let field_columns = filtered_fields_iter(&schema, predicate)
|
||||
.map(|field| Arc::new(field.name().to_string()))
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
// TODO: remove the use of tag_columns and field_column names
|
||||
// and instead use the schema directly)
|
||||
|
||||
let ss_plan = SeriesSetPlan::new_from_shared_timestamp(
|
||||
Arc::new(table_name),
|
||||
Arc::from(table_name),
|
||||
plan,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
|
@ -1316,7 +1316,7 @@ impl AggExprs {
|
|||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let field_columns = filtered_fields_iter(schema, predicate)
|
||||
.map(|field| Arc::new(field.name().to_string()))
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect::<Vec<_>>()
|
||||
.into();
|
||||
|
||||
|
@ -1356,8 +1356,8 @@ impl AggExprs {
|
|||
)?);
|
||||
|
||||
field_list.push((
|
||||
Arc::new(field.name().to_string()), // value name
|
||||
Arc::new(time_column_name),
|
||||
Arc::from(field.name().as_str()), // value name
|
||||
Arc::from(time_column_name.as_str()),
|
||||
));
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ use crate::exec::field::FieldColumns;
|
|||
#[derive(Debug)]
|
||||
pub struct SeriesSetPlan {
|
||||
/// The table name this came from
|
||||
pub table_name: Arc<String>,
|
||||
pub table_name: Arc<str>,
|
||||
|
||||
/// Datafusion plan to execute. The plan must produce
|
||||
/// RecordBatches that have:
|
||||
|
@ -29,7 +29,7 @@ pub struct SeriesSetPlan {
|
|||
/// Note these are `Arc` strings because they are duplicated for
|
||||
/// *each* resulting `SeriesSet` that is produced when this type
|
||||
/// of plan is executed.
|
||||
pub tag_columns: Vec<Arc<String>>,
|
||||
pub tag_columns: Vec<Arc<str>>,
|
||||
|
||||
/// The names of the columns which are "fields"
|
||||
pub field_columns: FieldColumns,
|
||||
|
@ -42,19 +42,19 @@ pub struct SeriesSetPlan {
|
|||
impl SeriesSetPlan {
|
||||
/// Create a SeriesSetPlan that will not produce any Group items
|
||||
pub fn new_from_shared_timestamp(
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
plan: LogicalPlan,
|
||||
tag_columns: Vec<Arc<String>>,
|
||||
field_columns: Vec<Arc<String>>,
|
||||
tag_columns: Vec<Arc<str>>,
|
||||
field_columns: Vec<Arc<str>>,
|
||||
) -> Self {
|
||||
Self::new(table_name, plan, tag_columns, field_columns.into())
|
||||
}
|
||||
|
||||
/// Create a SeriesSetPlan that will not produce any Group items
|
||||
pub fn new(
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
plan: LogicalPlan,
|
||||
tag_columns: Vec<Arc<String>>,
|
||||
tag_columns: Vec<Arc<str>>,
|
||||
field_columns: FieldColumns,
|
||||
) -> Self {
|
||||
let num_prefix_tag_group_columns = None;
|
||||
|
|
|
@ -50,7 +50,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// consistent.
|
||||
#[derive(Debug)]
|
||||
pub struct ProviderBuilder<C: PartitionChunk + 'static> {
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
schema_merger: SchemaMerger,
|
||||
chunk_and_infos: Vec<ChunkInfo<C>>,
|
||||
}
|
||||
|
@ -84,9 +84,9 @@ where
|
|||
}
|
||||
|
||||
impl<C: PartitionChunk> ProviderBuilder<C> {
|
||||
pub fn new(table_name: impl Into<String>) -> Self {
|
||||
pub fn new(table_name: impl AsRef<str>) -> Self {
|
||||
Self {
|
||||
table_name: Arc::new(table_name.into()),
|
||||
table_name: Arc::from(table_name.as_ref()),
|
||||
schema_merger: SchemaMerger::new(),
|
||||
chunk_and_infos: Vec::new(),
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ impl<C: PartitionChunk> ProviderBuilder<C> {
|
|||
/// push predicates and selections down to chunks
|
||||
#[derive(Debug)]
|
||||
pub struct ChunkTableProvider<C: PartitionChunk + 'static> {
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
/// The IOx schema (wrapper around Arrow Schemaref) for this table
|
||||
iox_schema: Schema,
|
||||
// The chunks and their corresponding schema
|
||||
|
|
|
@ -18,7 +18,7 @@ use super::{adapter::SchemaAdapterStream, ChunkInfo};
|
|||
/// Implements the DataFusion physical plan interface
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> {
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
/// The desired output schema (includes selection_
|
||||
/// note that the chunk may not have all these columns.
|
||||
schema: SchemaRef,
|
||||
|
@ -28,7 +28,7 @@ pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> {
|
|||
|
||||
impl<C: PartitionChunk + 'static> IOxReadFilterNode<C> {
|
||||
pub fn new(
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
schema: SchemaRef,
|
||||
chunk_and_infos: Vec<ChunkInfo<C>>,
|
||||
predicate: Predicate,
|
||||
|
|
|
@ -2261,16 +2261,12 @@ mod tests {
|
|||
|
||||
print!("Partitions: {:?}", db.partition_keys().unwrap());
|
||||
|
||||
fn to_arc(s: &str) -> Arc<String> {
|
||||
Arc::new(s.to_string())
|
||||
}
|
||||
|
||||
let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15");
|
||||
let chunk_summaries = normalize_summaries(chunk_summaries);
|
||||
|
||||
let expected = vec![ChunkSummary::new_without_timestamps(
|
||||
to_arc("1970-01-05T15"),
|
||||
to_arc("cpu"),
|
||||
Arc::from("1970-01-05T15"),
|
||||
Arc::from("cpu"),
|
||||
0,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
106,
|
||||
|
@ -2370,41 +2366,37 @@ mod tests {
|
|||
db.rollover_partition("1970-01-05T15", "cpu").await.unwrap();
|
||||
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000");
|
||||
|
||||
fn to_arc(s: &str) -> Arc<String> {
|
||||
Arc::new(s.to_string())
|
||||
}
|
||||
|
||||
let chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let chunk_summaries = normalize_summaries(chunk_summaries);
|
||||
|
||||
let expected = vec![
|
||||
ChunkSummary::new_without_timestamps(
|
||||
to_arc("1970-01-01T00"),
|
||||
to_arc("cpu"),
|
||||
Arc::from("1970-01-01T00"),
|
||||
Arc::from("cpu"),
|
||||
0,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
1904, // size of RB and OS chunks
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
to_arc("1970-01-01T00"),
|
||||
to_arc("cpu"),
|
||||
Arc::from("1970-01-01T00"),
|
||||
Arc::from("cpu"),
|
||||
1,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
100,
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
to_arc("1970-01-05T15"),
|
||||
to_arc("cpu"),
|
||||
Arc::from("1970-01-05T15"),
|
||||
Arc::from("cpu"),
|
||||
0,
|
||||
ChunkStorage::ClosedMutableBuffer,
|
||||
129,
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
to_arc("1970-01-05T15"),
|
||||
to_arc("cpu"),
|
||||
Arc::from("1970-01-05T15"),
|
||||
Arc::from("cpu"),
|
||||
1,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
131,
|
||||
|
|
|
@ -81,10 +81,10 @@ impl ChunkState {
|
|||
#[derive(Debug)]
|
||||
pub struct Chunk {
|
||||
/// What partition does the chunk belong to?
|
||||
partition_key: Arc<String>,
|
||||
partition_key: Arc<str>,
|
||||
|
||||
/// What table does the chunk belong to?
|
||||
table_name: Arc<String>,
|
||||
table_name: Arc<str>,
|
||||
|
||||
/// The ID of the chunk
|
||||
id: u32,
|
||||
|
@ -109,8 +109,8 @@ pub struct Chunk {
|
|||
macro_rules! unexpected_state {
|
||||
($SELF: expr, $OP: expr, $EXPECTED: expr, $STATE: expr) => {
|
||||
InternalChunkState {
|
||||
partition_key: $SELF.partition_key.as_str(),
|
||||
table_name: $SELF.table_name.as_str(),
|
||||
partition_key: $SELF.partition_key.as_ref(),
|
||||
table_name: $SELF.table_name.as_ref(),
|
||||
chunk_id: $SELF.id,
|
||||
operation: $OP,
|
||||
expected: $EXPECTED,
|
||||
|
@ -131,26 +131,25 @@ impl Chunk {
|
|||
/// 4. a write is recorded (see [`record_write`](Self::record_write))
|
||||
pub(crate) fn new_open(
|
||||
batch: TableBatch<'_>,
|
||||
partition_key: impl Into<String>,
|
||||
partition_key: impl AsRef<str>,
|
||||
id: u32,
|
||||
clock_value: ClockValue,
|
||||
server_id: ServerId,
|
||||
memory_registry: &MemRegistry,
|
||||
) -> Result<Self> {
|
||||
let table_name = batch.name().to_string();
|
||||
let partition_key: String = partition_key.into();
|
||||
let table_name = Arc::from(batch.name());
|
||||
|
||||
let mut mb = mutable_buffer::chunk::Chunk::new(id, memory_registry);
|
||||
mb.write_table_batches(clock_value, server_id, &[batch])
|
||||
.context(OpenChunk {
|
||||
partition_key: partition_key.clone(),
|
||||
partition_key: partition_key.as_ref(),
|
||||
chunk_id: id,
|
||||
})?;
|
||||
|
||||
let state = ChunkState::Open(mb);
|
||||
let mut chunk = Self {
|
||||
partition_key: Arc::new(partition_key),
|
||||
table_name: Arc::new(table_name),
|
||||
partition_key: Arc::from(partition_key.as_ref()),
|
||||
table_name,
|
||||
id,
|
||||
state,
|
||||
time_of_first_write: None,
|
||||
|
|
|
@ -72,7 +72,7 @@ pub enum DbChunk {
|
|||
},
|
||||
ReadBuffer {
|
||||
chunk: Arc<ReadBufferChunk>,
|
||||
partition_key: Arc<String>,
|
||||
partition_key: Arc<str>,
|
||||
},
|
||||
ParquetFile {
|
||||
chunk: Arc<ParquetChunk>,
|
||||
|
@ -82,7 +82,7 @@ pub enum DbChunk {
|
|||
impl DbChunk {
|
||||
/// Create a DBChunk snapshot of the catalog chunk
|
||||
pub fn snapshot(chunk: &super::catalog::chunk::Chunk) -> Arc<Self> {
|
||||
let partition_key = Arc::new(chunk.key().to_string());
|
||||
let partition_key = Arc::from(chunk.key());
|
||||
|
||||
use super::catalog::chunk::ChunkState;
|
||||
|
||||
|
|
|
@ -213,8 +213,8 @@ mod tests {
|
|||
fn test_from_chunk_summaries() {
|
||||
let chunks = vec![
|
||||
ChunkSummary {
|
||||
partition_key: Arc::new("p1".to_string()),
|
||||
table_name: Arc::new("table1".to_string()),
|
||||
partition_key: Arc::from("p1"),
|
||||
table_name: Arc::from("table1"),
|
||||
id: 0,
|
||||
storage: ChunkStorage::OpenMutableBuffer,
|
||||
estimated_bytes: 23754,
|
||||
|
@ -227,8 +227,8 @@ mod tests {
|
|||
time_closed: None,
|
||||
},
|
||||
ChunkSummary {
|
||||
partition_key: Arc::new("p1".to_string()),
|
||||
table_name: Arc::new("table1".to_string()),
|
||||
partition_key: Arc::from("p1"),
|
||||
table_name: Arc::from("table1"),
|
||||
id: 0,
|
||||
storage: ChunkStorage::OpenMutableBuffer,
|
||||
estimated_bytes: 23454,
|
||||
|
|
|
@ -26,7 +26,7 @@ pub fn dump_field_indexes(f: FieldIndexes) -> Vec<String> {
|
|||
}
|
||||
|
||||
/// Format a the vec of Arc strings paris into strings
|
||||
pub fn dump_arc_vec(v: Vec<(Arc<String>, Arc<String>)>) -> Vec<String> {
|
||||
pub fn dump_arc_vec(v: Vec<(Arc<str>, Arc<str>)>) -> Vec<String> {
|
||||
v.into_iter()
|
||||
.map(|(k, v)| format!(" ({}, {})", k, v))
|
||||
.collect()
|
||||
|
|
|
@ -224,11 +224,7 @@ fn field_to_data(
|
|||
|
||||
// Convert the tag=value pairs from the series set to the correct gRPC
|
||||
// format, and add the _f and _m tags for the field name and measurement
|
||||
fn convert_tags(
|
||||
table_name: &str,
|
||||
field_name: &str,
|
||||
tags: &[(Arc<String>, Arc<String>)],
|
||||
) -> Vec<Tag> {
|
||||
fn convert_tags(table_name: &str, field_name: &str, tags: &[(Arc<str>, Arc<str>)]) -> Vec<Tag> {
|
||||
// Special case "measurement" name which is modeled as a tag of
|
||||
// "_measurement" and "field" which is modeled as a tag of "_field"
|
||||
let mut converted_tags = vec![
|
||||
|
@ -370,8 +366,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_series_set_conversion() {
|
||||
let series_set = SeriesSet {
|
||||
table_name: Arc::new("the_table".into()),
|
||||
tags: vec![(Arc::new("tag1".into()), Arc::new("val1".into()))],
|
||||
table_name: Arc::from("the_table"),
|
||||
tags: vec![(Arc::from("tag1"), Arc::from("val1"))],
|
||||
field_indexes: FieldIndexes::from_timestamp_and_value_indexes(4, &[0, 1, 2, 3]),
|
||||
start_row: 1,
|
||||
num_rows: 2,
|
||||
|
@ -425,8 +421,8 @@ mod tests {
|
|||
.expect("created new record batch");
|
||||
|
||||
let series_set = SeriesSet {
|
||||
table_name: Arc::new("the_table".into()),
|
||||
tags: vec![(Arc::new("tag1".into()), Arc::new("val1".into()))],
|
||||
table_name: Arc::from("the_table"),
|
||||
tags: vec![(Arc::from("tag1"), Arc::from("val1"))],
|
||||
// field indexes are (value, time)
|
||||
field_indexes: FieldIndexes::from_slice(&[(3, 2), (1, 0)]),
|
||||
start_row: 1,
|
||||
|
@ -483,8 +479,8 @@ mod tests {
|
|||
.expect("created new record batch");
|
||||
|
||||
let series_set = SeriesSet {
|
||||
table_name: Arc::new("the_table".into()),
|
||||
tags: vec![(Arc::new("state".into()), Arc::new("MA".into()))],
|
||||
table_name: Arc::from("the_table"),
|
||||
tags: vec![(Arc::from("state"), Arc::from("MA"))],
|
||||
field_indexes: FieldIndexes::from_timestamp_and_value_indexes(3, &[1, 2]),
|
||||
start_row: 0,
|
||||
num_rows: batch.num_rows(),
|
||||
|
@ -519,8 +515,8 @@ mod tests {
|
|||
fn test_group_group_conversion() {
|
||||
let group_description = GroupDescription {
|
||||
tags: vec![
|
||||
(Arc::new("tag1".into()), Arc::new("val1".into())),
|
||||
(Arc::new("tag2".into()), Arc::new("val2".into())),
|
||||
(Arc::from("tag1"), Arc::from("val1")),
|
||||
(Arc::from("tag2"), Arc::from("val2")),
|
||||
],
|
||||
};
|
||||
|
||||
|
@ -560,8 +556,8 @@ mod tests {
|
|||
.expect("created new record batch");
|
||||
|
||||
let series_set = SeriesSet {
|
||||
table_name: Arc::new("the_table".into()),
|
||||
tags: vec![(Arc::new("tag1".into()), Arc::new("val1".into()))],
|
||||
table_name: Arc::from("the_table"),
|
||||
tags: vec![(Arc::from("tag1"), Arc::from("val1"))],
|
||||
field_indexes: FieldIndexes::from_timestamp_and_value_indexes(1, &[0]),
|
||||
start_row: 1,
|
||||
num_rows: 2,
|
||||
|
|
|
@ -60,15 +60,15 @@ pub fn make_temp_file<C: AsRef<[u8]>>(contents: C) -> tempfile::NamedTempFile {
|
|||
}
|
||||
|
||||
/// convert form that is easier to type in tests to what some code needs
|
||||
pub fn str_vec_to_arc_vec(str_vec: &[&str]) -> Arc<Vec<Arc<String>>> {
|
||||
Arc::new(str_vec.iter().map(|s| Arc::new(String::from(*s))).collect())
|
||||
pub fn str_vec_to_arc_vec(str_vec: &[&str]) -> Arc<Vec<Arc<str>>> {
|
||||
Arc::new(str_vec.iter().map(|s| Arc::from(*s)).collect())
|
||||
}
|
||||
|
||||
/// convert form that is easier to type in tests to what some code needs
|
||||
pub fn str_pair_vec_to_vec(str_vec: &[(&str, &str)]) -> Vec<(Arc<String>, Arc<String>)> {
|
||||
pub fn str_pair_vec_to_vec(str_vec: &[(&str, &str)]) -> Vec<(Arc<str>, Arc<str>)> {
|
||||
str_vec
|
||||
.iter()
|
||||
.map(|(s1, s2)| (Arc::new(String::from(*s1)), Arc::new(String::from(*s2))))
|
||||
.map(|(s1, s2)| (Arc::from(*s1), Arc::from(*s2)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue