refactor: apply clippy
parent
b2cdd299f5
commit
92baa3d7e8
|
@ -15,6 +15,7 @@ use croaring::Treemap;
|
||||||
|
|
||||||
// TODO: return errors if trying to insert data out of order in an individual series
|
// TODO: return errors if trying to insert data out of order in an individual series
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
pub struct MemDB {
|
pub struct MemDB {
|
||||||
default_ring_buffer_size: usize,
|
default_ring_buffer_size: usize,
|
||||||
bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>,
|
bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>,
|
||||||
|
@ -67,7 +68,7 @@ impl StoreInSeriesData for Point<f64> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SeriesData {
|
impl SeriesData {
|
||||||
fn write_points(&mut self, points: &Vec<PointType>) {
|
fn write_points(&mut self, points: &[PointType]) {
|
||||||
for p in points {
|
for p in points {
|
||||||
p.write(self);
|
p.write(self);
|
||||||
}
|
}
|
||||||
|
@ -145,9 +146,7 @@ impl<T: Clone> SeriesRingBuffer<T> {
|
||||||
|
|
||||||
fn oldest_time_and_position(&self) -> (i64, usize) {
|
fn oldest_time_and_position(&self) -> (i64, usize) {
|
||||||
let mut pos = self.next_position;
|
let mut pos = self.next_position;
|
||||||
if self.next_position == self.data.len() {
|
if self.next_position == self.data.len() || self.data[pos].time == std::i64::MAX {
|
||||||
pos = 0;
|
|
||||||
} else if self.data[pos].time == std::i64::MAX {
|
|
||||||
pos = 0;
|
pos = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,11 +199,11 @@ impl SeriesMap {
|
||||||
let posting_list = self
|
let posting_list = self
|
||||||
.posting_list
|
.posting_list
|
||||||
.entry(list_key)
|
.entry(list_key)
|
||||||
.or_insert(Treemap::create());
|
.or_insert_with(Treemap::create);
|
||||||
posting_list.add(self.last_id);
|
posting_list.add(self.last_id);
|
||||||
|
|
||||||
// insert the tag key value mapping
|
// insert the tag key value mapping
|
||||||
let tag_values = self.tag_keys.entry(pair.key).or_insert(BTreeMap::new());
|
let tag_values = self.tag_keys.entry(pair.key).or_insert_with(BTreeMap::new);
|
||||||
tag_values.insert(pair.value, true);
|
tag_values.insert(pair.value, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,13 +306,7 @@ impl MemDB {
|
||||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||||
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
|
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
|
||||||
Some(map) => {
|
Some(map) => {
|
||||||
let keys: Vec<String> = map
|
let keys: Vec<String> = map.read().unwrap().tag_keys.keys().cloned().collect();
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.tag_keys
|
|
||||||
.keys()
|
|
||||||
.map(|k| k.clone())
|
|
||||||
.collect();
|
|
||||||
Ok(Box::new(keys.into_iter()))
|
Ok(Box::new(keys.into_iter()))
|
||||||
}
|
}
|
||||||
None => Err(StorageError {
|
None => Err(StorageError {
|
||||||
|
@ -331,7 +324,7 @@ impl MemDB {
|
||||||
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
|
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
|
||||||
Some(map) => match map.read().unwrap().tag_keys.get(tag_key) {
|
Some(map) => match map.read().unwrap().tag_keys.get(tag_key) {
|
||||||
Some(values) => {
|
Some(values) => {
|
||||||
let values: Vec<String> = values.keys().map(|v| v.clone()).collect();
|
let values: Vec<String> = values.keys().cloned().collect();
|
||||||
Ok(Box::new(values.into_iter()))
|
Ok(Box::new(values.into_iter()))
|
||||||
}
|
}
|
||||||
None => Ok(Box::new(vec![].into_iter())),
|
None => Ok(Box::new(vec![].into_iter())),
|
||||||
|
@ -394,7 +387,7 @@ impl MemDB {
|
||||||
fn write_points_with_series_ids(
|
fn write_points_with_series_ids(
|
||||||
&self,
|
&self,
|
||||||
bucket_id: u32,
|
bucket_id: u32,
|
||||||
points: &Vec<PointType>,
|
points: &[PointType],
|
||||||
) -> Result<(), StorageError> {
|
) -> Result<(), StorageError> {
|
||||||
let bucket_data = self.bucket_id_to_series_data.read().unwrap();
|
let bucket_data = self.bucket_id_to_series_data.read().unwrap();
|
||||||
|
|
||||||
|
@ -438,7 +431,7 @@ impl MemDB {
|
||||||
};
|
};
|
||||||
|
|
||||||
let data = data.lock().unwrap();
|
let data = data.lock().unwrap();
|
||||||
let buff = match FromSeries::from_series(&data, &series_id) {
|
let buff = match FromSeries::from_series(&data, series_id) {
|
||||||
Some(b) => b,
|
Some(b) => b,
|
||||||
None => {
|
None => {
|
||||||
return Err(StorageError {
|
return Err(StorageError {
|
||||||
|
@ -456,19 +449,18 @@ impl MemDB {
|
||||||
}
|
}
|
||||||
|
|
||||||
trait FromSeries: Clone {
|
trait FromSeries: Clone {
|
||||||
fn from_series<'a>(data: &'a SeriesData, series_id: &u64)
|
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<Self>>;
|
||||||
-> Option<&'a SeriesRingBuffer<Self>>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromSeries for i64 {
|
impl FromSeries for i64 {
|
||||||
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) -> Option<&'a SeriesRingBuffer<i64>> {
|
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<i64>> {
|
||||||
data.i64_series.get(series_id)
|
data.i64_series.get(&series_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromSeries for f64 {
|
impl FromSeries for f64 {
|
||||||
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) -> Option<&'a SeriesRingBuffer<f64>> {
|
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<f64>> {
|
||||||
data.f64_series.get(series_id)
|
data.f64_series.get(&series_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -488,7 +480,7 @@ impl<T: Clone> Iterator for PointsIterator<T> {
|
||||||
|
|
||||||
let remaining = values.split_off(self.batch_size);
|
let remaining = values.split_off(self.batch_size);
|
||||||
|
|
||||||
if remaining.len() != 0 {
|
if !remaining.is_empty() {
|
||||||
self.values = Some(remaining);
|
self.values = Some(remaining);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,12 +567,10 @@ fn evaluate_comparison(
|
||||||
};
|
};
|
||||||
|
|
||||||
match op {
|
match op {
|
||||||
Comparison::Equal => return Ok(series_map.posting_list_for_key_value(&left, &right)),
|
Comparison::Equal => Ok(series_map.posting_list_for_key_value(&left, &right)),
|
||||||
comp => {
|
comp => Err(StorageError {
|
||||||
return Err(StorageError {
|
description: format!("unable to handle comparison {:?}", comp),
|
||||||
description: format!("unable to handle comparison {:?}", comp),
|
}),
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -623,9 +613,9 @@ impl SeriesStore for MemDB {
|
||||||
fn write_points_with_series_ids(
|
fn write_points_with_series_ids(
|
||||||
&self,
|
&self,
|
||||||
bucket_id: u32,
|
bucket_id: u32,
|
||||||
points: &Vec<PointType>,
|
points: &[PointType],
|
||||||
) -> Result<(), StorageError> {
|
) -> Result<(), StorageError> {
|
||||||
self.write_points_with_series_ids(bucket_id, points)
|
self.write_points_with_series_ids(bucket_id, &points.to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_i64_range(
|
fn read_i64_range(
|
||||||
|
|
|
@ -57,7 +57,7 @@ impl RocksDB {
|
||||||
Ok(names) => names
|
Ok(names) => names
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|name| {
|
.map(|name| {
|
||||||
if &name == BUCKET_CF {
|
if name == BUCKET_CF {
|
||||||
bucket_cf_descriptor()
|
bucket_cf_descriptor()
|
||||||
} else {
|
} else {
|
||||||
ColumnFamilyDescriptor::new(&name, index_cf_options())
|
ColumnFamilyDescriptor::new(&name, index_cf_options())
|
||||||
|
@ -87,11 +87,7 @@ impl RocksDB {
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * bucket_id - the globally unique bucket id
|
/// * bucket_id - the globally unique bucket id
|
||||||
/// * points - individual values with their timestamps, series keys, and series IDs
|
/// * points - individual values with their timestamps, series keys, and series IDs
|
||||||
pub fn write_points(
|
pub fn write_points(&self, bucket_id: u32, points: &[PointType]) -> Result<(), StorageError> {
|
||||||
&self,
|
|
||||||
bucket_id: u32,
|
|
||||||
points: &Vec<PointType>,
|
|
||||||
) -> Result<(), StorageError> {
|
|
||||||
// TODO: validate bucket exists?
|
// TODO: validate bucket exists?
|
||||||
|
|
||||||
let mut batch = WriteBatch::default();
|
let mut batch = WriteBatch::default();
|
||||||
|
@ -399,14 +395,10 @@ impl RocksDB {
|
||||||
};
|
};
|
||||||
|
|
||||||
match op {
|
match op {
|
||||||
Comparison::Equal => {
|
Comparison::Equal => self.get_posting_list_for_tag_key_value(bucket_id, &left, &right),
|
||||||
return self.get_posting_list_for_tag_key_value(bucket_id, &left, &right);
|
comp => Err(StorageError {
|
||||||
}
|
description: format!("unable to handle comparison {:?}", comp),
|
||||||
comp => {
|
}),
|
||||||
return Err(StorageError {
|
|
||||||
description: format!("unable to handle comparison {:?}", comp),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,25 +439,22 @@ impl RocksDB {
|
||||||
|
|
||||||
let db = self.db.read().unwrap();
|
let db = self.db.read().unwrap();
|
||||||
|
|
||||||
match db.cf_handle(&cf_name) {
|
if let Some(index) = db.cf_handle(&cf_name) {
|
||||||
Some(index) => {
|
let prefix = index_tag_key_prefix(bucket_id);
|
||||||
let prefix = index_tag_key_prefix(bucket_id);
|
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
let iter = db
|
||||||
let iter = db
|
.iterator_cf(index, mode)
|
||||||
.iterator_cf(index, mode)
|
.expect("unexpected rocksdb error getting iterator for index");
|
||||||
.expect("unexpected rocksdb error getting iterator for index");
|
|
||||||
|
|
||||||
for (key, _) in iter {
|
for (key, _) in iter {
|
||||||
if !key.starts_with(&prefix) {
|
if !key.starts_with(&prefix) {
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
|
|
||||||
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
|
|
||||||
keys.push(k.to_string());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
|
||||||
|
keys.push(k.to_string());
|
||||||
}
|
}
|
||||||
None => (),
|
};
|
||||||
}
|
|
||||||
|
|
||||||
keys
|
keys
|
||||||
}
|
}
|
||||||
|
@ -481,24 +470,21 @@ impl RocksDB {
|
||||||
let db = self.db.read().unwrap();
|
let db = self.db.read().unwrap();
|
||||||
let mut values = vec![];
|
let mut values = vec![];
|
||||||
|
|
||||||
match db.cf_handle(&cf_name) {
|
if let Some(index) = db.cf_handle(&cf_name) {
|
||||||
Some(index) => {
|
let prefix = index_tag_key_value_prefix(bucket_id, tag);
|
||||||
let prefix = index_tag_key_value_prefix(bucket_id, tag);
|
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
let iter = db
|
||||||
let iter = db
|
.iterator_cf(index, mode)
|
||||||
.iterator_cf(index, mode)
|
.expect("unexpected rocksdb error getting iterator for index");
|
||||||
.expect("unexpected rocksdb error getting iterator for index");
|
|
||||||
|
|
||||||
for (key, _) in iter {
|
for (key, _) in iter {
|
||||||
if !key.starts_with(&prefix) {
|
if !key.starts_with(&prefix) {
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
|
|
||||||
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
|
|
||||||
values.push(v.to_string());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
|
||||||
|
values.push(v.to_string());
|
||||||
}
|
}
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
values
|
values
|
||||||
|
@ -518,7 +504,7 @@ impl RocksDB {
|
||||||
let mut map = self.series_insert_lock.write().expect("mutex poisoned");
|
let mut map = self.series_insert_lock.write().expect("mutex poisoned");
|
||||||
|
|
||||||
// now only insert the new mutex if someone else hasn't done it between dropping read and obtaining write
|
// now only insert the new mutex if someone else hasn't done it between dropping read and obtaining write
|
||||||
if let None = map.get(&bucket_id) {
|
if map.get(&bucket_id).is_none() {
|
||||||
map.insert(bucket_id, Mutex::new(1));
|
map.insert(bucket_id, Mutex::new(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -540,10 +526,7 @@ impl RocksDB {
|
||||||
|
|
||||||
// create the column family to store the index if it doesn't exist
|
// create the column family to store the index if it doesn't exist
|
||||||
let cf_name = index_cf_name(bucket_id);
|
let cf_name = index_cf_name(bucket_id);
|
||||||
let index_exists = match self.db.read().unwrap().cf_handle(&cf_name) {
|
let index_exists = self.db.read().unwrap().cf_handle(&cf_name).is_some();
|
||||||
Some(_) => true,
|
|
||||||
None => false,
|
|
||||||
};
|
|
||||||
|
|
||||||
if !index_exists {
|
if !index_exists {
|
||||||
self.db
|
self.db
|
||||||
|
@ -565,7 +548,7 @@ impl RocksDB {
|
||||||
// now loop through the series and insert the index entries into the map
|
// now loop through the series and insert the index entries into the map
|
||||||
for point in points {
|
for point in points {
|
||||||
// don't bother with series in the collection that already have IDs
|
// don't bother with series in the collection that already have IDs
|
||||||
if let Some(_) = point.series_id() {
|
if point.series_id().is_some() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,17 +688,15 @@ impl RocksDB {
|
||||||
BucketEntryType::NextSeriesID => {
|
BucketEntryType::NextSeriesID => {
|
||||||
// read the bucket id from the key
|
// read the bucket id from the key
|
||||||
let mut c = Cursor::new(&key[1..]);
|
let mut c = Cursor::new(&key[1..]);
|
||||||
let bucket_id = c.read_u32::<BigEndian>().expect(&format!(
|
let bucket_id = c.read_u32::<BigEndian>().unwrap_or_else(|_| {
|
||||||
"couldn't read the bucket id from the key {:?}",
|
panic!("couldn't read the bucket id from the key {:?}", key)
|
||||||
key
|
});
|
||||||
));
|
|
||||||
|
|
||||||
// and the next series ID
|
// and the next series ID
|
||||||
let mut c = Cursor::new(value);
|
let mut c = Cursor::new(value);
|
||||||
let next_id = c.read_u64::<BigEndian>().expect(&format!(
|
let next_id = c.read_u64::<BigEndian>().unwrap_or_else(|_| {
|
||||||
"couldn't read the next series id for bucket {}",
|
panic!("couldn't read the next series id for bucket {}", bucket_id)
|
||||||
bucket_id
|
});
|
||||||
));
|
|
||||||
id_mutex_map.insert(bucket_id, Mutex::new(next_id));
|
id_mutex_map.insert(bucket_id, Mutex::new(next_id));
|
||||||
}
|
}
|
||||||
BucketEntryType::Bucket => {
|
BucketEntryType::Bucket => {
|
||||||
|
@ -804,7 +785,7 @@ impl SeriesStore for RocksDB {
|
||||||
fn write_points_with_series_ids(
|
fn write_points_with_series_ids(
|
||||||
&self,
|
&self,
|
||||||
bucket_id: u32,
|
bucket_id: u32,
|
||||||
points: &Vec<PointType>,
|
points: &[PointType],
|
||||||
) -> Result<(), StorageError> {
|
) -> Result<(), StorageError> {
|
||||||
self.write_points(bucket_id, &points)
|
self.write_points(bucket_id, &points)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ pub trait SeriesStore: Sync + Send {
|
||||||
fn write_points_with_series_ids(
|
fn write_points_with_series_ids(
|
||||||
&self,
|
&self,
|
||||||
bucket_id: u32,
|
bucket_id: u32,
|
||||||
points: &Vec<PointType>,
|
points: &[PointType],
|
||||||
) -> Result<(), StorageError>;
|
) -> Result<(), StorageError>;
|
||||||
|
|
||||||
fn read_i64_range(
|
fn read_i64_range(
|
||||||
|
|
Loading…
Reference in New Issue