Merge remote-tracking branch 'origin/master' into er-encoder-bench

pull/24376/head
Carol (Nichols || Goulding) 2020-02-17 08:47:38 -05:00
commit b6184cb778
11 changed files with 174 additions and 211 deletions

View File

@ -58,7 +58,7 @@ jobs:
- install_clang - install_clang
- run: - run:
name: Clippy name: Clippy
command: cargo clippy --all -j9 || echo "clippy failed, but ignoring this for now..." command: cargo clippy --all -j9
test: test:
docker: docker:
- image: circleci/rust:latest - image: circleci/rust:latest

View File

@ -3,13 +3,13 @@ use std::error::Error;
// SENTINEL is used to terminate a float-encoded block. A sentinel marker value // SENTINEL is used to terminate a float-encoded block. A sentinel marker value
// is useful because blocks do not always end aligned to bytes, and spare empty // is useful because blocks do not always end aligned to bytes, and spare empty
// bits can otherwise have undesirable semantic meaning. // bits can otherwise have undesirable semantic meaning.
const SENTINEL: u64 = 0x7ff80000000000ff; // in the quiet NaN range. const SENTINEL: u64 = 0x7ff8_0000_0000_00ff; // in the quiet NaN range.
fn is_sentinel_f64(v: f64) -> bool { fn is_sentinel_f64(v: f64) -> bool {
return v.to_bits() == SENTINEL; v.to_bits() == SENTINEL
} }
fn is_sentinel_u64(v: u64) -> bool { fn is_sentinel_u64(v: u64) -> bool {
return v == SENTINEL; v == SENTINEL
} }
/// encode encodes a vector of floats into dst. /// encode encodes a vector of floats into dst.
@ -19,9 +19,10 @@ fn is_sentinel_u64(v: u64) -> bool {
/// two is determined. Leading and trailing zero bits are then analysed and /// two is determined. Leading and trailing zero bits are then analysed and
/// representations based on those are stored. /// representations based on those are stored.
#[allow(dead_code)] #[allow(dead_code)]
#[allow(clippy::many_single_char_names)]
pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> { pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer. dst.truncate(0); // reset buffer.
if src.len() == 0 { if src.is_empty() {
return Ok(()); return Ok(());
} }
if dst.capacity() < 9 { if dst.capacity() < 9 {
@ -40,7 +41,7 @@ pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
let (mut prev_leading, mut prev_trailing) = (!0u64, 0u64); let (mut prev_leading, mut prev_trailing) = (!0u64, 0u64);
// encode remaining values // encode remaining values
for i in 1..src.len() + 1 { for i in 1..=src.len() {
let x; let x;
if i < src.len() { if i < src.len() {
x = src[i]; x = src[i];
@ -114,9 +115,8 @@ pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
// TODO(edd): maybe this can be optimised? // TODO(edd): maybe this can be optimised?
let k = n >> 3; let k = n >> 3;
let vv_bytes = &vv.to_be_bytes(); let vv_bytes = &vv.to_be_bytes();
for i in k..k + 8 { dst[k..k + 8].clone_from_slice(&vv_bytes[0..(k + 8 - k)]);
dst[i] = vv_bytes[i - k];
}
n += (l - written) as usize; n += (l - written) as usize;
} else { } else {
prev_leading = leading; prev_leading = leading;
@ -218,9 +218,7 @@ pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
// TODO(edd): maybe this can be optimised? // TODO(edd): maybe this can be optimised?
let k = n >> 3; let k = n >> 3;
let vv_bytes = &vv.to_be_bytes(); let vv_bytes = &vv.to_be_bytes();
for i in k..k + 8 { dst[k..k + 8].clone_from_slice(&vv_bytes[0..(k + 8 - k)]);
dst[i] = vv_bytes[i - k];
}
n += l - written; n += l - written;
} }
prev = cur; prev = cur;
@ -249,74 +247,76 @@ pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
// //
// TODO(edd): figure out how to generate this. // TODO(edd): figure out how to generate this.
const BIT_MASK: [u64; 64] = [ const BIT_MASK: [u64; 64] = [
0xffffffffffffffff, 0xffff_ffff_ffff_ffff,
0x1, 0x0001,
0x3, 0x0003,
0x7, 0x0007,
0xf, 0x000f,
0x1f, 0x001f,
0x3f, 0x003f,
0x7f, 0x007f,
0xff, 0x00ff,
0x1ff, 0x01ff,
0x3ff, 0x03ff,
0x7ff, 0x07ff,
0xfff, 0x0fff,
0x1fff, 0x1fff,
0x3fff, 0x3fff,
0x7fff, 0x7fff,
0xffff, 0xffff,
0x1ffff, 0x0001_ffff,
0x3ffff, 0x0003_ffff,
0x7ffff, 0x0007_ffff,
0xfffff, 0x000f_ffff,
0x1fffff, 0x001f_ffff,
0x3fffff, 0x003f_ffff,
0x7fffff, 0x007f_ffff,
0xffffff, 0x00ff_ffff,
0x1ffffff, 0x01ff_ffff,
0x3ffffff, 0x03ff_ffff,
0x7ffffff, 0x07ff_ffff,
0xfffffff, 0x0fff_ffff,
0x1fffffff, 0x1fff_ffff,
0x3fffffff, 0x3fff_ffff,
0x7fffffff, 0x7fff_ffff,
0xffffffff, 0xffff_ffff,
0x1ffffffff, 0x0001_ffff_ffff,
0x3ffffffff, 0x0003_ffff_ffff,
0x7ffffffff, 0x0007_ffff_ffff,
0xfffffffff, 0x000f_ffff_ffff,
0x1fffffffff, 0x001f_ffff_ffff,
0x3fffffffff, 0x003f_ffff_ffff,
0x7fffffffff, 0x007f_ffff_ffff,
0xffffffffff, 0x00ff_ffff_ffff,
0x1ffffffffff, 0x01ff_ffff_ffff,
0x3ffffffffff, 0x03ff_ffff_ffff,
0x7ffffffffff, 0x07ff_ffff_ffff,
0xfffffffffff, 0x0fff_ffff_ffff,
0x1fffffffffff, 0x1fff_ffff_ffff,
0x3fffffffffff, 0x3fff_ffff_ffff,
0x7fffffffffff, 0x7fff_ffff_ffff,
0xffffffffffff, 0xffff_ffff_ffff,
0x1ffffffffffff, 0x0001_ffff_ffff_ffff,
0x3ffffffffffff, 0x0003_ffff_ffff_ffff,
0x7ffffffffffff, 0x0007_ffff_ffff_ffff,
0xfffffffffffff, 0x000f_ffff_ffff_ffff,
0x1fffffffffffff, 0x001f_ffff_ffff_ffff,
0x3fffffffffffff, 0x003f_ffff_ffff_ffff,
0x7fffffffffffff, 0x007f_ffff_ffff_ffff,
0xffffffffffffff, 0x00ff_ffff_ffff_ffff,
0x1ffffffffffffff, 0x01ff_ffff_ffff_ffff,
0x3ffffffffffffff, 0x03ff_ffff_ffff_ffff,
0x7ffffffffffffff, 0x07ff_ffff_ffff_ffff,
0xfffffffffffffff, 0x0fff_ffff_ffff_ffff,
0x1fffffffffffffff, 0x1fff_ffff_ffff_ffff,
0x3fffffffffffffff, 0x3fff_ffff_ffff_ffff,
0x7fffffffffffffff, 0x7fff_ffff_ffff_ffff,
]; ];
/// decode decodes a slice of bytes into a vector of floats. /// decode decodes a slice of bytes into a vector of floats.
#[allow(dead_code)] #[allow(dead_code)]
#[allow(clippy::many_single_char_names)]
#[allow(clippy::useless_let_if_seq)]
pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> { pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
if src.len() < 9 { if src.len() < 9 {
return Ok(()); return Ok(());
@ -373,7 +373,7 @@ pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
let mut meaningful_n = 64u8; let mut meaningful_n = 64u8;
loop { loop {
if br_valid_bits <= 0 { if br_valid_bits == 0 {
match refill_cache(i) { match refill_cache(i) {
Ok(res) => { Ok(res) => {
br_cached_val = res.0; br_cached_val = res.0;
@ -387,12 +387,12 @@ pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
// read control bit 0. // read control bit 0.
br_valid_bits -= 1; br_valid_bits -= 1;
br_cached_val = br_cached_val.rotate_left(1); br_cached_val = br_cached_val.rotate_left(1);
if br_cached_val & 1 <= 0 { if br_cached_val & 1 == 0 {
dst.push(f64::from_bits(val)); dst.push(f64::from_bits(val));
continue; continue;
} }
if br_valid_bits <= 0 { if br_valid_bits == 0 {
match refill_cache(i) { match refill_cache(i) {
Ok(res) => { Ok(res) => {
br_cached_val = res.0; br_cached_val = res.0;
@ -433,7 +433,7 @@ pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
br_cached_val = br_cached_val.rotate_left(bits_01 as u32); br_cached_val = br_cached_val.rotate_left(bits_01 as u32);
br_valid_bits -= bits_01; br_valid_bits -= bits_01;
lm_bits = lm_bits & !BIT_MASK[(bits_01 & 0x3f) as usize]; lm_bits &= !BIT_MASK[(bits_01 & 0x3f) as usize];
lm_bits |= br_cached_val & BIT_MASK[(bits_01 & 0x3f) as usize]; lm_bits |= br_cached_val & BIT_MASK[(bits_01 & 0x3f) as usize];
} }
@ -472,7 +472,7 @@ pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
br_cached_val = br_cached_val.rotate_left(m_bits as u32); br_cached_val = br_cached_val.rotate_left(m_bits as u32);
br_valid_bits = br_valid_bits.wrapping_sub(m_bits); br_valid_bits = br_valid_bits.wrapping_sub(m_bits);
s_bits = s_bits & !BIT_MASK[(m_bits & 0x3f) as usize]; s_bits &= !BIT_MASK[(m_bits & 0x3f) as usize];
s_bits |= br_cached_val & BIT_MASK[(m_bits & 0x3f) as usize]; s_bits |= br_cached_val & BIT_MASK[(m_bits & 0x3f) as usize];
} }
s_bits &= BIT_MASK[(meaningful_n & 0x3f) as usize]; s_bits &= BIT_MASK[(meaningful_n & 0x3f) as usize];

View File

@ -18,7 +18,7 @@ enum Encoding {
#[allow(dead_code)] #[allow(dead_code)]
pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> { pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer. dst.truncate(0); // reset buffer.
if src.len() == 0 { if src.is_empty() {
return Ok(()); return Ok(());
} }
@ -88,7 +88,7 @@ fn zig_zag_decode(v: u64) -> i64 {
// TODO(edd): this is expensive as it copies. There are cheap // TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute // but unsafe alternatives to look into such as std::mem::transmute
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> { fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>() src.iter().map(|x| *x as u64).collect::<Vec<u64>>()
} }
// encode_rle encodes the value v, delta and count into dst. // encode_rle encodes the value v, delta and count into dst.
@ -117,22 +117,22 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
/// decode decodes a slice of bytes into a vector of signed integers. /// decode decodes a slice of bytes into a vector of signed integers.
#[allow(dead_code)] #[allow(dead_code)]
pub fn decode<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> { pub fn decode<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 { if src.is_empty() {
return Ok(()); return Ok(());
} }
let encoding = &src[0] >> 4; let encoding = &src[0] >> 4;
match encoding { match encoding {
encoding if encoding == Encoding::Uncompressed as u8 => { encoding if encoding == Encoding::Uncompressed as u8 => {
return decode_uncompressed(&src[1..], dst); // first byte not used decode_uncompressed(&src[1..], dst) // first byte not used
} }
encoding if encoding == Encoding::Rle as u8 => return decode_rle(&src[1..], dst), encoding if encoding == Encoding::Rle as u8 => decode_rle(&src[1..], dst),
encoding if encoding == Encoding::Simple8b as u8 => return decode_simple8b(&src[1..], dst), encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(&src[1..], dst),
_ => return Err(From::from("invalid block encoding")), _ => Err(From::from("invalid block encoding")),
} }
} }
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> { fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 || src.len() & 0x7 != 0 { if src.is_empty() || src.len() & 0x7 != 0 {
return Err(From::from("invalid uncompressed block length")); return Err(From::from("invalid uncompressed block length"));
} }
@ -161,13 +161,13 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
let mut i = 8; // Skip first value let mut i = 8; // Skip first value
let (delta, n) = u64::decode_var(&src[i..]); let (delta, n) = u64::decode_var(&src[i..]);
if n <= 0 { if n == 0 {
return Err(From::from("unable to decode delta")); return Err(From::from("unable to decode delta"));
} }
i += n; i += n;
let (count, n) = usize::decode_var(&src[i..]); let (count, n) = usize::decode_var(&src[i..]);
if n <= 0 { if n == 0 {
return Err(From::from("unable to decode count")); return Err(From::from("unable to decode count"));
} }

View File

@ -183,7 +183,7 @@ fn decode_value(v: u64, dst: &mut [u64]) -> usize {
5 5
} }
12 => { 12 => {
dst[0] = (v >> 0) & 0x7fff; dst[0] = v & 0x7fff;
dst[1] = (v >> 15) & 0x7fff; dst[1] = (v >> 15) & 0x7fff;
dst[2] = (v >> 30) & 0x7fff; dst[2] = (v >> 30) & 0x7fff;
dst[3] = (v >> 45) & 0x7fff; dst[3] = (v >> 45) & 0x7fff;

View File

@ -19,7 +19,7 @@ enum Encoding {
#[allow(dead_code)] #[allow(dead_code)]
pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> { pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer. dst.truncate(0); // reset buffer.
if src.len() == 0 { if src.is_empty() {
return Ok(()); return Ok(());
} }
@ -93,7 +93,7 @@ pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error
// TODO(edd): this is expensive as it copies. There are cheap // TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute // but unsafe alternatives to look into such as std::mem::transmute
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> { fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>() src.iter().map(|x| *x as u64).collect::<Vec<u64>>()
} }
// encode_rle encodes the value v, delta and count into dst. // encode_rle encodes the value v, delta and count into dst.
@ -146,23 +146,23 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
/// vector of signed integers. /// vector of signed integers.
#[allow(dead_code)] #[allow(dead_code)]
pub fn decode<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> { pub fn decode<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 { if src.is_empty() {
return Ok(()); return Ok(());
} }
let encoding = &src[0] >> 4; let encoding = &src[0] >> 4;
match encoding { match encoding {
encoding if encoding == Encoding::Uncompressed as u8 => { encoding if encoding == Encoding::Uncompressed as u8 => {
return decode_uncompressed(&src[1..], dst); // first byte not used decode_uncompressed(&src[1..], dst) // first byte not used
} }
encoding if encoding == Encoding::Rle as u8 => return decode_rle(&src, dst), encoding if encoding == Encoding::Rle as u8 => decode_rle(&src, dst),
encoding if encoding == Encoding::Simple8b as u8 => return decode_simple8b(&src, dst), encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(&src, dst),
_ => return Err(From::from("invalid block encoding")), _ => Err(From::from("invalid block encoding")),
} }
} }
// decode_uncompressed writes the binary encoded values in src into dst. // decode_uncompressed writes the binary encoded values in src into dst.
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> { fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 || src.len() & 0x7 != 0 { if src.is_empty() || src.len() & 0x7 != 0 {
return Err(From::from("invalid uncompressed block length")); return Err(From::from("invalid uncompressed block length"));
} }
@ -190,7 +190,7 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
} }
// calculate the scaler from the lower 4 bits of the first byte. // calculate the scaler from the lower 4 bits of the first byte.
let scaler = 10_u64.pow((src[0] & 0b00001111) as u32); let scaler = 10_u64.pow((src[0] & 0b0000_1111) as u32);
let mut i = 1; let mut i = 1;
// TODO(edd): this should be possible to do in-place without copy. // TODO(edd): this should be possible to do in-place without copy.
@ -198,14 +198,14 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
a.copy_from_slice(&src[i..i + 8]); a.copy_from_slice(&src[i..i + 8]);
i += 8; i += 8;
let (mut delta, n) = u64::decode_var(&src[i..]); let (mut delta, n) = u64::decode_var(&src[i..]);
if n <= 0 { if n == 0 {
return Err(From::from("unable to decode delta")); return Err(From::from("unable to decode delta"));
} }
i += n; i += n;
delta *= scaler; delta *= scaler;
let (count, n) = usize::decode_var(&src[i..]); let (count, n) = usize::decode_var(&src[i..]);
if n <= 0 { if n == 0 {
return Err(From::from("unable to decode count")); return Err(From::from("unable to decode count"));
} }
@ -226,7 +226,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
return Err(From::from("not enough data to decode packed timestamp")); return Err(From::from("not enough data to decode packed timestamp"));
} }
let scaler = 10_u64.pow((src[0] & 0b00001111) as u32); let scaler = 10_u64.pow((src[0] & 0b0000_1111) as u32);
// TODO(edd): pre-allocate res by counting bytes in encoded slice? // TODO(edd): pre-allocate res by counting bytes in encoded slice?
let mut res = vec![]; let mut res = vec![];

View File

@ -104,13 +104,13 @@ impl PointType {
/// cases where this series is already in the database, this parse step can be skipped entirely. /// cases where this series is already in the database, this parse step can be skipped entirely.
/// The measurement is represented as a _m key and field as _f. /// The measurement is represented as a _m key and field as _f.
pub fn index_pairs(key: &str) -> Result<Vec<Pair>, ParseError> { pub fn index_pairs(key: &str) -> Result<Vec<Pair>, ParseError> {
let mut chars = key.chars(); let chars = key.chars();
let mut pairs = vec![]; let mut pairs = vec![];
let mut key = "_m".to_string(); let mut key = "_m".to_string();
let mut value = String::with_capacity(250); let mut value = String::with_capacity(250);
let mut reading_key = false; let mut reading_key = false;
while let Some(ch) = chars.next() { for ch in chars {
match ch { match ch {
',' => { ',' => {
reading_key = true; reading_key = true;
@ -175,12 +175,10 @@ impl ResponseError for ParseError {
pub fn parse(input: &str) -> Vec<PointType> { pub fn parse(input: &str) -> Vec<PointType> {
let mut points: Vec<PointType> = Vec::with_capacity(10000); let mut points: Vec<PointType> = Vec::with_capacity(10000);
let lines = input.lines(); let lines = input.lines();
for line in lines { for line in lines {
read_line(line, &mut points) read_line(line, &mut points)
} }
points
return points;
} }
fn read_line(line: &str, points: &mut Vec<PointType>) { fn read_line(line: &str, points: &mut Vec<PointType>) {
@ -237,22 +235,19 @@ fn read_value(
) -> bool { ) -> bool {
let mut value = String::new(); let mut value = String::new();
while let Some(ch) = chars.next() { for ch in chars {
match ch { match ch {
' ' | ',' => { ' ' | ',' => {
let series = measurement_tags.to_string() + "\t" + &field_name; let series = measurement_tags.to_string() + "\t" + &field_name;
// if the last character of the value is an i then it's an integer, otherwise it's // if the last character of the value is an i then it's an integer, otherwise it's
// a float (at least until we support the other data types // a float (at least until we support the other data types
let point = match value.ends_with("i") { let point = if value.ends_with('i') {
true => { let val = value[..value.len() - 1].parse::<i64>().unwrap();
let val = value[..value.len() - 1].parse::<i64>().unwrap(); PointType::new_i64(series, val, 0)
PointType::new_i64(series, val, 0) } else {
} let val = value.parse::<f64>().unwrap();
false => { PointType::new_f64(series, val, 0)
let val = value.parse::<f64>().unwrap();
PointType::new_f64(series, val, 0)
}
}; };
points.push(point); points.push(point);

View File

@ -78,7 +78,7 @@ async fn write(
.json(serde_json::json!({ "error": format!("{}", err) }))); .json(serde_json::json!({ "error": format!("{}", err) })));
} }
Ok(HttpResponse::Ok().json({})) Ok(HttpResponse::Ok().json(()))
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]

View File

@ -15,6 +15,7 @@ use croaring::Treemap;
// TODO: return errors if trying to insert data out of order in an individual series // TODO: return errors if trying to insert data out of order in an individual series
#[derive(Default)]
pub struct MemDB { pub struct MemDB {
default_ring_buffer_size: usize, default_ring_buffer_size: usize,
bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>, bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>,
@ -67,7 +68,7 @@ impl StoreInSeriesData for Point<f64> {
} }
impl SeriesData { impl SeriesData {
fn write_points(&mut self, points: &Vec<PointType>) { fn write_points(&mut self, points: &[PointType]) {
for p in points { for p in points {
p.write(self); p.write(self);
} }
@ -145,9 +146,7 @@ impl<T: Clone> SeriesRingBuffer<T> {
fn oldest_time_and_position(&self) -> (i64, usize) { fn oldest_time_and_position(&self) -> (i64, usize) {
let mut pos = self.next_position; let mut pos = self.next_position;
if self.next_position == self.data.len() { if self.next_position == self.data.len() || self.data[pos].time == std::i64::MAX {
pos = 0;
} else if self.data[pos].time == std::i64::MAX {
pos = 0; pos = 0;
} }
@ -200,11 +199,11 @@ impl SeriesMap {
let posting_list = self let posting_list = self
.posting_list .posting_list
.entry(list_key) .entry(list_key)
.or_insert(Treemap::create()); .or_insert_with(Treemap::create);
posting_list.add(self.last_id); posting_list.add(self.last_id);
// insert the tag key value mapping // insert the tag key value mapping
let tag_values = self.tag_keys.entry(pair.key).or_insert(BTreeMap::new()); let tag_values = self.tag_keys.entry(pair.key).or_insert_with(BTreeMap::new);
tag_values.insert(pair.value, true); tag_values.insert(pair.value, true);
} }
@ -307,13 +306,7 @@ impl MemDB {
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> { ) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) { match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => { Some(map) => {
let keys: Vec<String> = map let keys: Vec<String> = map.read().unwrap().tag_keys.keys().cloned().collect();
.read()
.unwrap()
.tag_keys
.keys()
.map(|k| k.clone())
.collect();
Ok(Box::new(keys.into_iter())) Ok(Box::new(keys.into_iter()))
} }
None => Err(StorageError { None => Err(StorageError {
@ -331,7 +324,7 @@ impl MemDB {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) { match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => match map.read().unwrap().tag_keys.get(tag_key) { Some(map) => match map.read().unwrap().tag_keys.get(tag_key) {
Some(values) => { Some(values) => {
let values: Vec<String> = values.keys().map(|v| v.clone()).collect(); let values: Vec<String> = values.keys().cloned().collect();
Ok(Box::new(values.into_iter())) Ok(Box::new(values.into_iter()))
} }
None => Ok(Box::new(vec![].into_iter())), None => Ok(Box::new(vec![].into_iter())),
@ -394,7 +387,7 @@ impl MemDB {
fn write_points_with_series_ids( fn write_points_with_series_ids(
&self, &self,
bucket_id: u32, bucket_id: u32,
points: &Vec<PointType>, points: &[PointType],
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let bucket_data = self.bucket_id_to_series_data.read().unwrap(); let bucket_data = self.bucket_id_to_series_data.read().unwrap();
@ -438,7 +431,7 @@ impl MemDB {
}; };
let data = data.lock().unwrap(); let data = data.lock().unwrap();
let buff = match FromSeries::from_series(&data, &series_id) { let buff = match FromSeries::from_series(&data, series_id) {
Some(b) => b, Some(b) => b,
None => { None => {
return Err(StorageError { return Err(StorageError {
@ -456,19 +449,18 @@ impl MemDB {
} }
trait FromSeries: Clone { trait FromSeries: Clone {
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<Self>>;
-> Option<&'a SeriesRingBuffer<Self>>;
} }
impl FromSeries for i64 { impl FromSeries for i64 {
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) -> Option<&'a SeriesRingBuffer<i64>> { fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<i64>> {
data.i64_series.get(series_id) data.i64_series.get(&series_id)
} }
} }
impl FromSeries for f64 { impl FromSeries for f64 {
fn from_series<'a>(data: &'a SeriesData, series_id: &u64) -> Option<&'a SeriesRingBuffer<f64>> { fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<f64>> {
data.f64_series.get(series_id) data.f64_series.get(&series_id)
} }
} }
@ -488,7 +480,7 @@ impl<T: Clone> Iterator for PointsIterator<T> {
let remaining = values.split_off(self.batch_size); let remaining = values.split_off(self.batch_size);
if remaining.len() != 0 { if !remaining.is_empty() {
self.values = Some(remaining); self.values = Some(remaining);
} }
@ -575,12 +567,10 @@ fn evaluate_comparison(
}; };
match op { match op {
Comparison::Equal => return Ok(series_map.posting_list_for_key_value(&left, &right)), Comparison::Equal => Ok(series_map.posting_list_for_key_value(&left, &right)),
comp => { comp => Err(StorageError {
return Err(StorageError { description: format!("unable to handle comparison {:?}", comp),
description: format!("unable to handle comparison {:?}", comp), }),
})
}
} }
} }
@ -623,7 +613,7 @@ impl SeriesStore for MemDB {
fn write_points_with_series_ids( fn write_points_with_series_ids(
&self, &self,
bucket_id: u32, bucket_id: u32,
points: &Vec<PointType>, points: &[PointType],
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
self.write_points_with_series_ids(bucket_id, points) self.write_points_with_series_ids(bucket_id, points)
} }

View File

@ -57,7 +57,7 @@ impl RocksDB {
Ok(names) => names Ok(names) => names
.into_iter() .into_iter()
.map(|name| { .map(|name| {
if &name == BUCKET_CF { if name == BUCKET_CF {
bucket_cf_descriptor() bucket_cf_descriptor()
} else { } else {
ColumnFamilyDescriptor::new(&name, index_cf_options()) ColumnFamilyDescriptor::new(&name, index_cf_options())
@ -87,11 +87,7 @@ impl RocksDB {
/// # Arguments /// # Arguments
/// * bucket_id - the globally unique bucket id /// * bucket_id - the globally unique bucket id
/// * points - individual values with their timestamps, series keys, and series IDs /// * points - individual values with their timestamps, series keys, and series IDs
pub fn write_points( pub fn write_points(&self, bucket_id: u32, points: &[PointType]) -> Result<(), StorageError> {
&self,
bucket_id: u32,
points: &Vec<PointType>,
) -> Result<(), StorageError> {
// TODO: validate bucket exists? // TODO: validate bucket exists?
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
@ -399,14 +395,10 @@ impl RocksDB {
}; };
match op { match op {
Comparison::Equal => { Comparison::Equal => self.get_posting_list_for_tag_key_value(bucket_id, &left, &right),
return self.get_posting_list_for_tag_key_value(bucket_id, &left, &right); comp => Err(StorageError {
} description: format!("unable to handle comparison {:?}", comp),
comp => { }),
return Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
})
}
} }
} }
@ -447,25 +439,22 @@ impl RocksDB {
let db = self.db.read().unwrap(); let db = self.db.read().unwrap();
match db.cf_handle(&cf_name) { if let Some(index) = db.cf_handle(&cf_name) {
Some(index) => { let prefix = index_tag_key_prefix(bucket_id);
let prefix = index_tag_key_prefix(bucket_id); let mode = IteratorMode::From(&prefix, Direction::Forward);
let mode = IteratorMode::From(&prefix, Direction::Forward); let iter = db
let iter = db .iterator_cf(index, mode)
.iterator_cf(index, mode) .expect("unexpected rocksdb error getting iterator for index");
.expect("unexpected rocksdb error getting iterator for index");
for (key, _) in iter { for (key, _) in iter {
if !key.starts_with(&prefix) { if !key.starts_with(&prefix) {
break; break;
}
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
keys.push(k.to_string());
} }
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
keys.push(k.to_string());
} }
None => (), };
}
keys keys
} }
@ -481,24 +470,21 @@ impl RocksDB {
let db = self.db.read().unwrap(); let db = self.db.read().unwrap();
let mut values = vec![]; let mut values = vec![];
match db.cf_handle(&cf_name) { if let Some(index) = db.cf_handle(&cf_name) {
Some(index) => { let prefix = index_tag_key_value_prefix(bucket_id, tag);
let prefix = index_tag_key_value_prefix(bucket_id, tag); let mode = IteratorMode::From(&prefix, Direction::Forward);
let mode = IteratorMode::From(&prefix, Direction::Forward); let iter = db
let iter = db .iterator_cf(index, mode)
.iterator_cf(index, mode) .expect("unexpected rocksdb error getting iterator for index");
.expect("unexpected rocksdb error getting iterator for index");
for (key, _) in iter { for (key, _) in iter {
if !key.starts_with(&prefix) { if !key.starts_with(&prefix) {
break; break;
}
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
values.push(v.to_string());
} }
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
values.push(v.to_string());
} }
None => (),
} }
values values
@ -518,7 +504,7 @@ impl RocksDB {
let mut map = self.series_insert_lock.write().expect("mutex poisoned"); let mut map = self.series_insert_lock.write().expect("mutex poisoned");
// now only insert the new mutex if someone else hasn't done it between dropping read and obtaining write // now only insert the new mutex if someone else hasn't done it between dropping read and obtaining write
if let None = map.get(&bucket_id) { if map.get(&bucket_id).is_none() {
map.insert(bucket_id, Mutex::new(1)); map.insert(bucket_id, Mutex::new(1));
} }
} }
@ -540,10 +526,7 @@ impl RocksDB {
// create the column family to store the index if it doesn't exist // create the column family to store the index if it doesn't exist
let cf_name = index_cf_name(bucket_id); let cf_name = index_cf_name(bucket_id);
let index_exists = match self.db.read().unwrap().cf_handle(&cf_name) { let index_exists = self.db.read().unwrap().cf_handle(&cf_name).is_some();
Some(_) => true,
None => false,
};
if !index_exists { if !index_exists {
self.db self.db
@ -565,7 +548,7 @@ impl RocksDB {
// now loop through the series and insert the index entries into the map // now loop through the series and insert the index entries into the map
for point in points { for point in points {
// don't bother with series in the collection that already have IDs // don't bother with series in the collection that already have IDs
if let Some(_) = point.series_id() { if point.series_id().is_some() {
continue; continue;
} }
@ -705,17 +688,15 @@ impl RocksDB {
BucketEntryType::NextSeriesID => { BucketEntryType::NextSeriesID => {
// read the bucket id from the key // read the bucket id from the key
let mut c = Cursor::new(&key[1..]); let mut c = Cursor::new(&key[1..]);
let bucket_id = c.read_u32::<BigEndian>().expect(&format!( let bucket_id = c.read_u32::<BigEndian>().unwrap_or_else(|_| {
"couldn't read the bucket id from the key {:?}", panic!("couldn't read the bucket id from the key {:?}", key)
key });
));
// and the next series ID // and the next series ID
let mut c = Cursor::new(value); let mut c = Cursor::new(value);
let next_id = c.read_u64::<BigEndian>().expect(&format!( let next_id = c.read_u64::<BigEndian>().unwrap_or_else(|_| {
"couldn't read the next series id for bucket {}", panic!("couldn't read the next series id for bucket {}", bucket_id)
bucket_id });
));
id_mutex_map.insert(bucket_id, Mutex::new(next_id)); id_mutex_map.insert(bucket_id, Mutex::new(next_id));
} }
BucketEntryType::Bucket => { BucketEntryType::Bucket => {
@ -804,7 +785,7 @@ impl SeriesStore for RocksDB {
fn write_points_with_series_ids( fn write_points_with_series_ids(
&self, &self,
bucket_id: u32, bucket_id: u32,
points: &Vec<PointType>, points: &[PointType],
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
self.write_points(bucket_id, &points) self.write_points(bucket_id, &points)
} }

View File

@ -5,7 +5,7 @@ pub trait SeriesStore: Sync + Send {
fn write_points_with_series_ids( fn write_points_with_series_ids(
&self, &self,
bucket_id: u32, bucket_id: u32,
points: &Vec<PointType>, points: &[PointType],
) -> Result<(), StorageError>; ) -> Result<(), StorageError>;
fn read_i64_range( fn read_i64_range(

View File

@ -41,10 +41,7 @@ pub fn parse_duration(s: &str) -> Result<RelativeDuration, Error> {
} }
let i; let i;
let mut start = 0; let start = if s.starts_with('-') { 1 } else { 0 };
if s.starts_with("-") {
start = 1;
}
match s[start..].chars().position(|c| !c.is_digit(10)) { match s[start..].chars().position(|c| !c.is_digit(10)) {
Some(p) => i = p + start, Some(p) => i = p + start,