Merge pull request #60 from influxdata/cn-remove-dead-code

refactor: Remove code that's now unused
pull/24376/head
Carol (Nichols || Goulding) 2020-04-06 17:00:56 -04:00 committed by GitHub
commit e60ae51c91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 19 additions and 465 deletions

View File

@ -2,15 +2,28 @@ use std::convert::TryFrom;
use std::error; use std::error;
use std::fmt; use std::fmt;
pub mod config_store;
pub mod database; pub mod database;
pub mod inverted_index;
pub mod memdb; pub mod memdb;
pub mod partitioned_store; pub mod partitioned_store;
pub mod predicate; pub mod predicate;
pub mod remote_partition; pub mod remote_partition;
pub mod s3_partition; pub mod s3_partition;
pub mod series_store;
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ReadPoint<T: Clone> {
pub time: i64,
pub value: T,
}
impl<T: Copy + Clone> From<&'_ crate::line_parser::Point<T>> for ReadPoint<T> {
fn from(other: &'_ crate::line_parser::Point<T>) -> Self {
let crate::line_parser::Point { time, value, .. } = other;
Self {
time: *time,
value: *value,
}
}
}
// The values for these enum variants have no real meaning, but they // The values for these enum variants have no real meaning, but they
// are serialized to disk. Revisit these whenever it's time to decide // are serialized to disk. Revisit these whenever it's time to decide

View File

@ -1,19 +0,0 @@
use crate::delorean::Bucket;
use crate::storage::StorageError;
use std::sync::Arc;
pub trait ConfigStore: Sync + Send {
fn create_bucket_if_not_exists(
&self,
org_id: u32,
bucket: &Bucket,
) -> Result<u32, StorageError>;
fn get_bucket_by_name(
&self,
org_id: u32,
bucket_name: &str,
) -> Result<Option<Arc<Bucket>>, StorageError>;
fn get_bucket_by_id(&self, bucket_id: u32) -> Result<Option<Arc<Bucket>>, StorageError>;
}

View File

@ -223,7 +223,7 @@ mod tests {
use crate::storage::database::Database; use crate::storage::database::Database;
use crate::storage::partitioned_store::ReadValues; use crate::storage::partitioned_store::ReadValues;
use crate::storage::predicate::parse_predicate; use crate::storage::predicate::parse_predicate;
use crate::storage::series_store::ReadPoint; use crate::storage::ReadPoint;
#[tokio::test] #[tokio::test]
async fn create_bucket() { async fn create_bucket() {

View File

@ -1,264 +0,0 @@
use crate::delorean::{Predicate, Tag};
use crate::line_parser::PointType;
use crate::storage::{SeriesDataType, StorageError};
pub trait InvertedIndex: Sync + Send {
fn get_or_create_series_ids_for_points(
&self,
bucket_id: u32,
points: &mut [PointType],
) -> Result<(), StorageError>;
fn read_series_matching(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError>;
fn get_tag_keys(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError>;
fn get_tag_values(
&self,
bucket_id: u32,
tag_key: &str,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError>;
}
#[derive(Debug, PartialEq, Clone)]
pub struct SeriesFilter {
pub id: u64,
pub key: String,
pub value_predicate: Option<Predicate>,
pub series_type: SeriesDataType,
}
impl SeriesFilter {
// TODO: Handle escaping of ',', '=', and '\t'
// TODO: Better error handling
pub fn tags(&self) -> Vec<Tag> {
let before_tab = self
.key
.splitn(2, '\t')
.next()
.expect("SeriesFilter key did not contain a tab");
before_tab
.split(',')
.skip(1)
.map(|kv| {
let mut parts = kv.splitn(2, '=');
Tag {
key: parts
.next()
.expect("SeriesFilter did not contain expected parts")
.bytes()
.collect(),
value: parts
.next()
.expect("SeriesFilter did not contain expected parts")
.bytes()
.collect(),
}
})
.collect()
}
}
#[cfg(test)]
pub mod tests {
use crate::delorean::Tag;
use crate::line_parser::PointType;
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::predicate::parse_predicate;
use crate::storage::SeriesDataType;
use std::str;
// Test helpers for other implementations to run
pub fn series_id_indexing(index: Box<dyn InvertedIndex>) {
let bucket_id = 1;
let bucket_2 = 2;
let p1 = PointType::new_i64("one".to_string(), 1, 0);
let p2 = PointType::new_i64("two".to_string(), 23, 40);
let p3 = PointType::new_i64("three".to_string(), 33, 86);
let mut points = vec![p1.clone(), p2];
index
.get_or_create_series_ids_for_points(bucket_id, &mut points)
.unwrap();
assert_eq!(points[0].series_id(), Some(1));
assert_eq!(points[1].series_id(), Some(2));
// now put series in a different bucket, but make sure the IDs start from the beginning
let mut points = vec![p1.clone()];
index
.get_or_create_series_ids_for_points(bucket_2, &mut points)
.unwrap();
assert_eq!(points[0].series_id(), Some(1));
// now insert a new series in the first bucket and make sure it shows up
let mut points = vec![p1, p3];
index
.get_or_create_series_ids_for_points(bucket_id, &mut points)
.unwrap();
assert_eq!(points[0].series_id(), Some(1));
assert_eq!(points[1].series_id(), Some(3));
}
pub fn series_metadata_indexing(index: Box<dyn InvertedIndex>) {
let bucket_id = 1;
let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0);
let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0);
let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0);
let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0);
let mut points = vec![p1, p2, p3, p4];
index
.get_or_create_series_ids_for_points(bucket_id, &mut points)
.unwrap();
let tag_keys: Vec<String> = index.get_tag_keys(bucket_id, None).unwrap().collect();
assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]);
let tag_values: Vec<String> = index
.get_tag_values(bucket_id, "host", None)
.unwrap()
.collect();
assert_eq!(tag_values, vec!["a", "b"]);
// get all series
// get series with measurement = mem
let pred = parse_predicate(r#"_m = "cpu""#).unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![
SeriesFilter {
id: 1,
key: "cpu,host=b,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 2,
key: "cpu,host=a,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 3,
key: "cpu,host=a,region=west\tusage_user".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
]
);
// get series with host = a
let pred = parse_predicate(r#"host = "a""#).unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![
SeriesFilter {
id: 2,
key: "cpu,host=a,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 3,
key: "cpu,host=a,region=west\tusage_user".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
]
);
// get series with measurement = cpu and host = b
let pred = parse_predicate(r#"_m = "cpu" and host = "b""#).unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![SeriesFilter {
id: 1,
key: "cpu,host=b,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},]
);
let pred = parse_predicate(r#"host = "a" OR _m = "mem""#).unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![
SeriesFilter {
id: 2,
key: "cpu,host=a,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 3,
key: "cpu,host=a,region=west\tusage_user".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 4,
key: "mem,host=b,region=west\tfree".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
]
);
}
pub fn tags_as_strings(tags: &[Tag]) -> Vec<(&str, &str)> {
tags.iter()
.map(|t| {
(
str::from_utf8(&t.key).unwrap(),
str::from_utf8(&t.value).unwrap(),
)
})
.collect()
}
// Unit tests for SeriesFilter
#[test]
fn series_filter_tag_parsing() {
let sf = SeriesFilter {
id: 1,
key: "cpu,host=b,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64,
};
assert_eq!(
tags_as_strings(&sf.tags()),
vec![("host", "b"), ("region", "west")]
);
}
}

View File

@ -1,11 +1,8 @@
#![allow(dead_code)]
use crate::delorean::{Node, Predicate, TimestampRange}; use crate::delorean::{Node, Predicate, TimestampRange};
use crate::line_parser::{self, Point, PointType}; use crate::line_parser::{self, Point, PointType};
use crate::storage::partitioned_store::{ReadBatch, ReadValues}; use crate::storage::partitioned_store::{ReadBatch, ReadValues};
use crate::storage::predicate::{Evaluate, EvaluateVisitor}; use crate::storage::predicate::{Evaluate, EvaluateVisitor};
use crate::storage::series_store::ReadPoint; use crate::storage::{ReadPoint, SeriesDataType, StorageError};
use crate::storage::{SeriesDataType, StorageError};
use croaring::Treemap; use croaring::Treemap;
use futures::stream::{self, BoxStream}; use futures::stream::{self, BoxStream};

View File

@ -5,8 +5,7 @@ use crate::line_parser::{self, PointType};
use crate::storage::memdb::MemDB; use crate::storage::memdb::MemDB;
use crate::storage::remote_partition::RemotePartition; use crate::storage::remote_partition::RemotePartition;
use crate::storage::s3_partition::S3Partition; use crate::storage::s3_partition::S3Partition;
use crate::storage::series_store::ReadPoint; use crate::storage::{ReadPoint, StorageError};
use crate::storage::StorageError;
use futures::stream::{BoxStream, Stream}; use futures::stream::{BoxStream, Stream};
use std::cmp::Ordering; use std::cmp::Ordering;

View File

@ -1,172 +0,0 @@
use crate::delorean::TimestampRange;
use crate::line_parser::PointType;
use crate::storage::StorageError;
pub trait SeriesStore: Sync + Send {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &[PointType],
) -> Result<(), StorageError>;
fn read_i64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError>;
fn read_f64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError>;
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ReadPoint<T: Clone> {
pub time: i64,
pub value: T,
}
impl<T: Copy + Clone> From<&'_ crate::line_parser::Point<T>> for ReadPoint<T> {
fn from(other: &'_ crate::line_parser::Point<T>) -> Self {
let crate::line_parser::Point { time, value, .. } = other;
Self {
time: *time,
value: *value,
}
}
}
// Test helpers for other implementations to run
#[cfg(test)]
pub mod tests {
use crate::delorean::TimestampRange;
use crate::line_parser::PointType;
use crate::storage::series_store::{ReadPoint, SeriesStore};
pub fn write_and_read_i64(store: Box<dyn SeriesStore>) {
let b1_id = 1;
let b2_id = 2;
let mut p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1);
p1.set_series_id(1);
let mut p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2);
p2.set_series_id(1);
let mut p3 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 2);
p3.set_series_id(2);
let mut p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 4);
p4.set_series_id(2);
let b1_points = vec![p1.clone(), p2.clone()];
store
.write_points_with_series_ids(b1_id, &b1_points)
.unwrap();
let b2_points = vec![p1.clone(), p2, p3.clone(), p4];
store
.write_points_with_series_ids(b2_id, &b2_points)
.unwrap();
// test that we'll only read from the bucket we wrote points into
let range = TimestampRange { start: 1, end: 4 };
let mut points_iter = store
.read_i64_range(b1_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint { time: 1, value: 1 },
ReadPoint { time: 2, value: 1 },
]
);
assert_eq!(points_iter.next(), None);
// test that we'll read multiple series
let mut points_iter = store
.read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint { time: 1, value: 1 },
ReadPoint { time: 2, value: 1 },
]
);
let mut points_iter = store
.read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint { time: 2, value: 1 },
ReadPoint { time: 4, value: 1 },
]
);
// test that the batch size is honored
let mut points_iter = store
.read_i64_range(b1_id, p1.series_id().unwrap(), &range, 1)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 1, value: 1 },]);
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
assert_eq!(points_iter.next(), None);
// test that the time range is properly limiting
let range = TimestampRange { start: 2, end: 3 };
let mut points_iter = store
.read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
let mut points_iter = store
.read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
}
pub fn write_and_read_f64(store: Box<dyn SeriesStore>) {
let bucket_id = 1;
let mut p1 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 1.0, 1);
p1.set_series_id(1);
let mut p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2);
p2.set_series_id(1);
let points = vec![p1.clone(), p2];
store
.write_points_with_series_ids(bucket_id, &points)
.unwrap();
// test that we'll only read from the bucket we wrote points into
let range = TimestampRange { start: 0, end: 4 };
let mut points_iter = store
.read_f64_range(bucket_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint {
time: 1,
value: 1.0
},
ReadPoint {
time: 2,
value: 2.2
},
]
);
assert_eq!(points_iter.next(), None);
}
}