Merge remote-tracking branch 'origin/master' into hyper

pull/24376/head
Carol (Nichols || Goulding) 2020-02-17 08:54:19 -05:00
commit 23a9a800a6
11 changed files with 174 additions and 211 deletions

View File

@ -58,7 +58,7 @@ jobs:
- install_clang
- run:
name: Clippy
command: cargo clippy --all -j9 || echo "clippy failed, but ignoring this for now..."
command: cargo clippy --all -j9
test:
docker:
- image: circleci/rust:latest

View File

@ -3,13 +3,13 @@ use std::error::Error;
// SENTINEL is used to terminate a float-encoded block. A sentinel marker value
// is useful because blocks do not always end aligned to bytes, and spare empty
// bits can otherwise have undesirable semantic meaning.
const SENTINEL: u64 = 0x7ff80000000000ff; // in the quiet NaN range.
const SENTINEL: u64 = 0x7ff8_0000_0000_00ff; // in the quiet NaN range.
fn is_sentinel_f64(v: f64) -> bool {
return v.to_bits() == SENTINEL;
v.to_bits() == SENTINEL
}
fn is_sentinel_u64(v: u64) -> bool {
return v == SENTINEL;
v == SENTINEL
}
/// encode_all encodes a vector of floats into dst.
@ -19,9 +19,10 @@ fn is_sentinel_u64(v: u64) -> bool {
/// two is determined. Leading and trailing zero bits are then analysed and
/// representations based on those are stored.
#[allow(dead_code)]
#[allow(clippy::many_single_char_names)]
pub fn encode_all(src: &mut Vec<f64>, dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer.
if src.len() == 0 {
if src.is_empty() {
return Ok(());
}
if dst.capacity() < 9 {
@ -40,7 +41,7 @@ pub fn encode_all(src: &mut Vec<f64>, dst: &mut Vec<u8>) -> Result<(), Box<dyn E
let (mut prev_leading, mut prev_trailing) = (!0u64, 0u64);
// encode remaining values
for i in 1..src.len() + 1 {
for i in 1..=src.len() {
let x;
if i < src.len() {
x = src[i];
@ -114,9 +115,8 @@ pub fn encode_all(src: &mut Vec<f64>, dst: &mut Vec<u8>) -> Result<(), Box<dyn E
// TODO(edd): maybe this can be optimised?
let k = n >> 3;
let vv_bytes = &vv.to_be_bytes();
for i in k..k + 8 {
dst[i] = vv_bytes[i - k];
}
dst[k..k + 8].clone_from_slice(&vv_bytes[0..(k + 8 - k)]);
n += (l - written) as usize;
} else {
prev_leading = leading;
@ -218,9 +218,7 @@ pub fn encode_all(src: &mut Vec<f64>, dst: &mut Vec<u8>) -> Result<(), Box<dyn E
// TODO(edd): maybe this can be optimised?
let k = n >> 3;
let vv_bytes = &vv.to_be_bytes();
for i in k..k + 8 {
dst[i] = vv_bytes[i - k];
}
dst[k..k + 8].clone_from_slice(&vv_bytes[0..(k + 8 - k)]);
n += l - written;
}
prev = cur;
@ -249,74 +247,76 @@ pub fn encode_all(src: &mut Vec<f64>, dst: &mut Vec<u8>) -> Result<(), Box<dyn E
//
// TODO(edd): figure out how to generate this.
const BIT_MASK: [u64; 64] = [
0xffffffffffffffff,
0x1,
0x3,
0x7,
0xf,
0x1f,
0x3f,
0x7f,
0xff,
0x1ff,
0x3ff,
0x7ff,
0xfff,
0xffff_ffff_ffff_ffff,
0x0001,
0x0003,
0x0007,
0x000f,
0x001f,
0x003f,
0x007f,
0x00ff,
0x01ff,
0x03ff,
0x07ff,
0x0fff,
0x1fff,
0x3fff,
0x7fff,
0xffff,
0x1ffff,
0x3ffff,
0x7ffff,
0xfffff,
0x1fffff,
0x3fffff,
0x7fffff,
0xffffff,
0x1ffffff,
0x3ffffff,
0x7ffffff,
0xfffffff,
0x1fffffff,
0x3fffffff,
0x7fffffff,
0xffffffff,
0x1ffffffff,
0x3ffffffff,
0x7ffffffff,
0xfffffffff,
0x1fffffffff,
0x3fffffffff,
0x7fffffffff,
0xffffffffff,
0x1ffffffffff,
0x3ffffffffff,
0x7ffffffffff,
0xfffffffffff,
0x1fffffffffff,
0x3fffffffffff,
0x7fffffffffff,
0xffffffffffff,
0x1ffffffffffff,
0x3ffffffffffff,
0x7ffffffffffff,
0xfffffffffffff,
0x1fffffffffffff,
0x3fffffffffffff,
0x7fffffffffffff,
0xffffffffffffff,
0x1ffffffffffffff,
0x3ffffffffffffff,
0x7ffffffffffffff,
0xfffffffffffffff,
0x1fffffffffffffff,
0x3fffffffffffffff,
0x7fffffffffffffff,
0x0001_ffff,
0x0003_ffff,
0x0007_ffff,
0x000f_ffff,
0x001f_ffff,
0x003f_ffff,
0x007f_ffff,
0x00ff_ffff,
0x01ff_ffff,
0x03ff_ffff,
0x07ff_ffff,
0x0fff_ffff,
0x1fff_ffff,
0x3fff_ffff,
0x7fff_ffff,
0xffff_ffff,
0x0001_ffff_ffff,
0x0003_ffff_ffff,
0x0007_ffff_ffff,
0x000f_ffff_ffff,
0x001f_ffff_ffff,
0x003f_ffff_ffff,
0x007f_ffff_ffff,
0x00ff_ffff_ffff,
0x01ff_ffff_ffff,
0x03ff_ffff_ffff,
0x07ff_ffff_ffff,
0x0fff_ffff_ffff,
0x1fff_ffff_ffff,
0x3fff_ffff_ffff,
0x7fff_ffff_ffff,
0xffff_ffff_ffff,
0x0001_ffff_ffff_ffff,
0x0003_ffff_ffff_ffff,
0x0007_ffff_ffff_ffff,
0x000f_ffff_ffff_ffff,
0x001f_ffff_ffff_ffff,
0x003f_ffff_ffff_ffff,
0x007f_ffff_ffff_ffff,
0x00ff_ffff_ffff_ffff,
0x01ff_ffff_ffff_ffff,
0x03ff_ffff_ffff_ffff,
0x07ff_ffff_ffff_ffff,
0x0fff_ffff_ffff_ffff,
0x1fff_ffff_ffff_ffff,
0x3fff_ffff_ffff_ffff,
0x7fff_ffff_ffff_ffff,
];
/// decode_all decodes a slice of bytes into a vector of floats.
#[allow(dead_code)]
#[allow(clippy::many_single_char_names)]
#[allow(clippy::useless_let_if_seq)]
pub fn decode_all(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
if src.len() < 9 {
return Ok(());
@ -373,7 +373,7 @@ pub fn decode_all(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>>
let mut meaningful_n = 64u8;
loop {
if br_valid_bits <= 0 {
if br_valid_bits == 0 {
match refill_cache(i) {
Ok(res) => {
br_cached_val = res.0;
@ -387,12 +387,12 @@ pub fn decode_all(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>>
// read control bit 0.
br_valid_bits -= 1;
br_cached_val = br_cached_val.rotate_left(1);
if br_cached_val & 1 <= 0 {
if br_cached_val & 1 == 0 {
dst.push(f64::from_bits(val));
continue;
}
if br_valid_bits <= 0 {
if br_valid_bits == 0 {
match refill_cache(i) {
Ok(res) => {
br_cached_val = res.0;
@ -433,7 +433,7 @@ pub fn decode_all(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>>
br_cached_val = br_cached_val.rotate_left(bits_01 as u32);
br_valid_bits -= bits_01;
lm_bits = lm_bits & !BIT_MASK[(bits_01 & 0x3f) as usize];
lm_bits &= !BIT_MASK[(bits_01 & 0x3f) as usize];
lm_bits |= br_cached_val & BIT_MASK[(bits_01 & 0x3f) as usize];
}
@ -472,7 +472,7 @@ pub fn decode_all(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>>
br_cached_val = br_cached_val.rotate_left(m_bits as u32);
br_valid_bits = br_valid_bits.wrapping_sub(m_bits);
s_bits = s_bits & !BIT_MASK[(m_bits & 0x3f) as usize];
s_bits &= !BIT_MASK[(m_bits & 0x3f) as usize];
s_bits |= br_cached_val & BIT_MASK[(m_bits & 0x3f) as usize];
}
s_bits &= BIT_MASK[(meaningful_n & 0x3f) as usize];

View File

@ -18,7 +18,7 @@ enum Encoding {
#[allow(dead_code)]
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer.
if src.len() == 0 {
if src.is_empty() {
return Ok(());
}
@ -88,7 +88,7 @@ fn zig_zag_decode(v: u64) -> i64 {
// TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>()
src.iter().map(|x| *x as u64).collect::<Vec<u64>>()
}
// encode_rle encodes the value v, delta and count into dst.
@ -117,22 +117,22 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
/// decode_all decodes a slice of bytes into a vector of signed integers.
#[allow(dead_code)]
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 {
if src.is_empty() {
return Ok(());
}
let encoding = &src[0] >> 4;
match encoding {
encoding if encoding == Encoding::Uncompressed as u8 => {
return decode_uncompressed(&src[1..], dst); // first byte not used
decode_uncompressed(&src[1..], dst) // first byte not used
}
encoding if encoding == Encoding::Rle as u8 => return decode_rle(&src[1..], dst),
encoding if encoding == Encoding::Simple8b as u8 => return decode_simple8b(&src[1..], dst),
_ => return Err(From::from("invalid block encoding")),
encoding if encoding == Encoding::Rle as u8 => decode_rle(&src[1..], dst),
encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(&src[1..], dst),
_ => Err(From::from("invalid block encoding")),
}
}
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 || src.len() & 0x7 != 0 {
if src.is_empty() || src.len() & 0x7 != 0 {
return Err(From::from("invalid uncompressed block length"));
}
@ -161,13 +161,13 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
let mut i = 8; // Skip first value
let (delta, n) = u64::decode_var(&src[i..]);
if n <= 0 {
if n == 0 {
return Err(From::from("unable to decode delta"));
}
i += n;
let (count, n) = usize::decode_var(&src[i..]);
if n <= 0 {
if n == 0 {
return Err(From::from("unable to decode count"));
}

View File

@ -184,7 +184,7 @@ pub fn decode(v: u64, dst: &mut [u64]) -> usize {
5
}
12 => {
dst[0] = (v >> 0) & 0x7fff;
dst[0] = v & 0x7fff;
dst[1] = (v >> 15) & 0x7fff;
dst[2] = (v >> 30) & 0x7fff;
dst[3] = (v >> 45) & 0x7fff;

View File

@ -19,7 +19,7 @@ enum Encoding {
#[allow(dead_code)]
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer.
if src.len() == 0 {
if src.is_empty() {
return Ok(());
}
@ -93,7 +93,7 @@ pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Bo
// TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>()
src.iter().map(|x| *x as u64).collect::<Vec<u64>>()
}
// encode_rle encodes the value v, delta and count into dst.
@ -146,23 +146,23 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
/// vector of signed integers.
#[allow(dead_code)]
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 {
if src.is_empty() {
return Ok(());
}
let encoding = &src[0] >> 4;
match encoding {
encoding if encoding == Encoding::Uncompressed as u8 => {
return decode_uncompressed(&src[1..], dst); // first byte not used
decode_uncompressed(&src[1..], dst) // first byte not used
}
encoding if encoding == Encoding::Rle as u8 => return decode_rle(&src, dst),
encoding if encoding == Encoding::Simple8b as u8 => return decode_simple8b(&src, dst),
_ => return Err(From::from("invalid block encoding")),
encoding if encoding == Encoding::Rle as u8 => decode_rle(&src, dst),
encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(&src, dst),
_ => Err(From::from("invalid block encoding")),
}
}
// decode_uncompressed writes the binary encoded values in src into dst.
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 || src.len() & 0x7 != 0 {
if src.is_empty() || src.len() & 0x7 != 0 {
return Err(From::from("invalid uncompressed block length"));
}
@ -190,7 +190,7 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
}
// calculate the scaler from the lower 4 bits of the first byte.
let scaler = 10_u64.pow((src[0] & 0b00001111) as u32);
let scaler = 10_u64.pow((src[0] & 0b0000_1111) as u32);
let mut i = 1;
// TODO(edd): this should be possible to do in-place without copy.
@ -198,14 +198,14 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
a.copy_from_slice(&src[i..i + 8]);
i += 8;
let (mut delta, n) = u64::decode_var(&src[i..]);
if n <= 0 {
if n == 0 {
return Err(From::from("unable to decode delta"));
}
i += n;
delta *= scaler;
let (count, n) = usize::decode_var(&src[i..]);
if n <= 0 {
if n == 0 {
return Err(From::from("unable to decode count"));
}
@ -226,7 +226,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
return Err(From::from("not enough data to decode packed timestamp"));
}
let scaler = 10_u64.pow((src[0] & 0b00001111) as u32);
let scaler = 10_u64.pow((src[0] & 0b0000_1111) as u32);
// TODO(edd): pre-allocate res by counting bytes in encoded slice?
let mut res = vec![];

View File

@ -102,13 +102,13 @@ impl PointType {
/// cases where this series is already in the database, this parse step can be skipped entirely.
/// The measurement is represented as a _m key and field as _f.
pub fn index_pairs(key: &str) -> Result<Vec<Pair>, ParseError> {
let mut chars = key.chars();
let chars = key.chars();
let mut pairs = vec![];
let mut key = "_m".to_string();
let mut value = String::with_capacity(250);
let mut reading_key = false;
while let Some(ch) = chars.next() {
for ch in chars {
match ch {
',' => {
reading_key = true;
@ -167,12 +167,10 @@ impl error::Error for ParseError {
pub fn parse(input: &str) -> Vec<PointType> {
let mut points: Vec<PointType> = Vec::with_capacity(10000);
let lines = input.lines();
for line in lines {
read_line(line, &mut points)
}
return points;
points
}
fn read_line(line: &str, points: &mut Vec<PointType>) {
@ -229,22 +227,19 @@ fn read_value(
) -> bool {
let mut value = String::new();
while let Some(ch) = chars.next() {
for ch in chars {
match ch {
' ' | ',' => {
let series = measurement_tags.to_string() + "\t" + &field_name;
// if the last character of the value is an i then it's an integer, otherwise it's
// a float (at least until we support the other data types
let point = match value.ends_with("i") {
true => {
let val = value[..value.len() - 1].parse::<i64>().unwrap();
PointType::new_i64(series, val, 0)
}
false => {
let val = value.parse::<f64>().unwrap();
PointType::new_f64(series, val, 0)
}
let point = if value.ends_with('i') {
let val = value[..value.len() - 1].parse::<i64>().unwrap();
PointType::new_i64(series, val, 0)
} else {
let val = value.parse::<f64>().unwrap();
PointType::new_f64(series, val, 0)
};
points.push(point);

View File

@ -93,7 +93,7 @@ async fn write(req: Request<Body>, app: Arc<App>) -> Result<Body, ApplicationErr
.write_points(write_info.org_id, &bucket, &mut points)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(serde_json::json!({}).to_string().into())
Ok(serde_json::json!(()).to_string().into())
}
#[derive(Deserialize, Debug)]

View File

@ -15,6 +15,7 @@ use croaring::Treemap;
// TODO: return errors if trying to insert data out of order in an individual series
#[derive(Default)]
pub struct MemDB {
default_ring_buffer_size: usize,
bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>,
@ -67,7 +68,7 @@ impl StoreInSeriesData for Point<f64> {
}
impl SeriesData {
fn write_points(&mut self, points: &Vec<PointType>) {
fn write_points(&mut self, points: &[PointType]) {
for p in points {
p.write(self);
}
@ -145,9 +146,7 @@ impl<T: Clone> SeriesRingBuffer<T> {
fn oldest_time_and_position(&self) -> (i64, usize) {
let mut pos = self.next_position;
if self.next_position == self.data.len() {
pos = 0;
} else if self.data[pos].time == std::i64::MAX {
if self.next_position == self.data.len() || self.data[pos].time == std::i64::MAX {
pos = 0;
}
@ -200,11 +199,11 @@ impl SeriesMap {
let posting_list = self
.posting_list
.entry(list_key)
.or_insert(Treemap::create());
.or_insert_with(Treemap::create);
posting_list.add(self.last_id);
// insert the tag key value mapping
let tag_values = self.tag_keys.entry(pair.key).or_insert(BTreeMap::new());
let tag_values = self.tag_keys.entry(pair.key).or_insert_with(BTreeMap::new);
tag_values.insert(pair.value, true);
}
@ -307,13 +306,7 @@ impl MemDB {
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => {
let keys: Vec<String> = map
.read()
.unwrap()
.tag_keys
.keys()
.map(|k| k.clone())
.collect();
let keys: Vec<String> = map.read().unwrap().tag_keys.keys().cloned().collect();
Ok(Box::new(keys.into_iter()))
}
None => Err(StorageError {
@ -331,7 +324,7 @@ impl MemDB {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => match map.read().unwrap().tag_keys.get(tag_key) {
Some(values) => {
let values: Vec<String> = values.keys().map(|v| v.clone()).collect();
let values: Vec<String> = values.keys().cloned().collect();
Ok(Box::new(values.into_iter()))
}
None => Ok(Box::new(vec![].into_iter())),
@ -394,7 +387,7 @@ impl MemDB {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
points: &[PointType],
) -> Result<(), StorageError> {
let bucket_data = self.bucket_id_to_series_data.read().unwrap();
@ -438,7 +431,7 @@ impl MemDB {
};
let data = data.lock().unwrap();
let buff = match FromSeries::from_series(&data, &series_id) {
let buff = match FromSeries::from_series(&data, series_id) {
Some(b) => b,
None => {
return Err(StorageError {
@ -456,19 +449,18 @@ impl MemDB {
}
trait FromSeries: Clone {
fn from_series<'a>(data: &'a SeriesData, series_id: &u64)
-> Option<&'a SeriesRingBuffer<Self>>;
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<Self>>;
}
impl FromSeries for i64 {
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) -> Option<&'a SeriesRingBuffer<i64>> {
data.i64_series.get(series_id)
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<i64>> {
data.i64_series.get(&series_id)
}
}
impl FromSeries for f64 {
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) -> Option<&'a SeriesRingBuffer<f64>> {
data.f64_series.get(series_id)
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<f64>> {
data.f64_series.get(&series_id)
}
}
@ -488,7 +480,7 @@ impl<T: Clone> Iterator for PointsIterator<T> {
let remaining = values.split_off(self.batch_size);
if remaining.len() != 0 {
if !remaining.is_empty() {
self.values = Some(remaining);
}
@ -575,12 +567,10 @@ fn evaluate_comparison(
};
match op {
Comparison::Equal => return Ok(series_map.posting_list_for_key_value(&left, &right)),
comp => {
return Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
})
}
Comparison::Equal => Ok(series_map.posting_list_for_key_value(&left, &right)),
comp => Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
}),
}
}
@ -623,7 +613,7 @@ impl SeriesStore for MemDB {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
points: &[PointType],
) -> Result<(), StorageError> {
self.write_points_with_series_ids(bucket_id, points)
}

View File

@ -57,7 +57,7 @@ impl RocksDB {
Ok(names) => names
.into_iter()
.map(|name| {
if &name == BUCKET_CF {
if name == BUCKET_CF {
bucket_cf_descriptor()
} else {
ColumnFamilyDescriptor::new(&name, index_cf_options())
@ -87,11 +87,7 @@ impl RocksDB {
/// # Arguments
/// * bucket_id - the globally unique bucket id
/// * points - individual values with their timestamps, series keys, and series IDs
pub fn write_points(
&self,
bucket_id: u32,
points: &Vec<PointType>,
) -> Result<(), StorageError> {
pub fn write_points(&self, bucket_id: u32, points: &[PointType]) -> Result<(), StorageError> {
// TODO: validate bucket exists?
let mut batch = WriteBatch::default();
@ -399,14 +395,10 @@ impl RocksDB {
};
match op {
Comparison::Equal => {
return self.get_posting_list_for_tag_key_value(bucket_id, &left, &right);
}
comp => {
return Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
})
}
Comparison::Equal => self.get_posting_list_for_tag_key_value(bucket_id, &left, &right),
comp => Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
}),
}
}
@ -447,25 +439,22 @@ impl RocksDB {
let db = self.db.read().unwrap();
match db.cf_handle(&cf_name) {
Some(index) => {
let prefix = index_tag_key_prefix(bucket_id);
let mode = IteratorMode::From(&prefix, Direction::Forward);
let iter = db
.iterator_cf(index, mode)
.expect("unexpected rocksdb error getting iterator for index");
if let Some(index) = db.cf_handle(&cf_name) {
let prefix = index_tag_key_prefix(bucket_id);
let mode = IteratorMode::From(&prefix, Direction::Forward);
let iter = db
.iterator_cf(index, mode)
.expect("unexpected rocksdb error getting iterator for index");
for (key, _) in iter {
if !key.starts_with(&prefix) {
break;
}
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
keys.push(k.to_string());
for (key, _) in iter {
if !key.starts_with(&prefix) {
break;
}
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
keys.push(k.to_string());
}
None => (),
}
};
keys
}
@ -481,24 +470,21 @@ impl RocksDB {
let db = self.db.read().unwrap();
let mut values = vec![];
match db.cf_handle(&cf_name) {
Some(index) => {
let prefix = index_tag_key_value_prefix(bucket_id, tag);
let mode = IteratorMode::From(&prefix, Direction::Forward);
let iter = db
.iterator_cf(index, mode)
.expect("unexpected rocksdb error getting iterator for index");
if let Some(index) = db.cf_handle(&cf_name) {
let prefix = index_tag_key_value_prefix(bucket_id, tag);
let mode = IteratorMode::From(&prefix, Direction::Forward);
let iter = db
.iterator_cf(index, mode)
.expect("unexpected rocksdb error getting iterator for index");
for (key, _) in iter {
if !key.starts_with(&prefix) {
break;
}
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
values.push(v.to_string());
for (key, _) in iter {
if !key.starts_with(&prefix) {
break;
}
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
values.push(v.to_string());
}
None => (),
}
values
@ -518,7 +504,7 @@ impl RocksDB {
let mut map = self.series_insert_lock.write().expect("mutex poisoned");
// now only insert the new mutex if someone else hasn't done it between dropping read and obtaining write
if let None = map.get(&bucket_id) {
if map.get(&bucket_id).is_none() {
map.insert(bucket_id, Mutex::new(1));
}
}
@ -540,10 +526,7 @@ impl RocksDB {
// create the column family to store the index if it doesn't exist
let cf_name = index_cf_name(bucket_id);
let index_exists = match self.db.read().unwrap().cf_handle(&cf_name) {
Some(_) => true,
None => false,
};
let index_exists = self.db.read().unwrap().cf_handle(&cf_name).is_some();
if !index_exists {
self.db
@ -565,7 +548,7 @@ impl RocksDB {
// now loop through the series and insert the index entries into the map
for point in points {
// don't bother with series in the collection that already have IDs
if let Some(_) = point.series_id() {
if point.series_id().is_some() {
continue;
}
@ -705,17 +688,15 @@ impl RocksDB {
BucketEntryType::NextSeriesID => {
// read the bucket id from the key
let mut c = Cursor::new(&key[1..]);
let bucket_id = c.read_u32::<BigEndian>().expect(&format!(
"couldn't read the bucket id from the key {:?}",
key
));
let bucket_id = c.read_u32::<BigEndian>().unwrap_or_else(|_| {
panic!("couldn't read the bucket id from the key {:?}", key)
});
// and the next series ID
let mut c = Cursor::new(value);
let next_id = c.read_u64::<BigEndian>().expect(&format!(
"couldn't read the next series id for bucket {}",
bucket_id
));
let next_id = c.read_u64::<BigEndian>().unwrap_or_else(|_| {
panic!("couldn't read the next series id for bucket {}", bucket_id)
});
id_mutex_map.insert(bucket_id, Mutex::new(next_id));
}
BucketEntryType::Bucket => {
@ -804,7 +785,7 @@ impl SeriesStore for RocksDB {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
points: &[PointType],
) -> Result<(), StorageError> {
self.write_points(bucket_id, &points)
}

View File

@ -5,7 +5,7 @@ pub trait SeriesStore: Sync + Send {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
points: &[PointType],
) -> Result<(), StorageError>;
fn read_i64_range(

View File

@ -41,10 +41,7 @@ pub fn parse_duration(s: &str) -> Result<RelativeDuration, Error> {
}
let i;
let mut start = 0;
if s.starts_with("-") {
start = 1;
}
let start = if s.starts_with('-') { 1 } else { 0 };
match s[start..].chars().position(|c| !c.is_digit(10)) {
Some(p) => i = p + start,