Fix compile warnings from the Rust linter
This commit fixes all the linter warnings. However, there are a number of spots, particularly in the encoders where I added `#[allow(dead_code)]` to get past them. We should go through and fix those up at some point, I'll log an issue to track.pull/24376/head
parent
1a851f8d0b
commit
c7a862dca0
|
@ -3,6 +3,7 @@ use integer_encoding::*;
|
|||
use std::error::Error;
|
||||
|
||||
/// Encoding describes the type of encoding used by an encoded integer block.
|
||||
#[allow(dead_code)]
|
||||
enum Encoding {
|
||||
Uncompressed = 0,
|
||||
Simple8b = 1,
|
||||
|
@ -15,8 +16,8 @@ enum Encoding {
|
|||
/// deltas are then zig-zag encoded. The resulting zig-zag encoded deltas are
|
||||
/// further compressed if possible, either via bit-packing using simple8b or by
|
||||
/// run-length encoding the deltas if they're all the same.
|
||||
///
|
||||
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
||||
dst.truncate(0); // reset buffer.
|
||||
if src.len() == 0 {
|
||||
return Ok(());
|
||||
|
@ -74,12 +75,14 @@ pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Bo
|
|||
// negative and positive values across even and odd numbers.
|
||||
//
|
||||
// Eg. [0,-1,1,-2] becomes [0, 1, 2, 3].
|
||||
#[allow(dead_code)]
|
||||
fn zig_zag_encode(v: i64) -> u64 {
|
||||
((v << 1) ^ (v >> 63)) as u64
|
||||
}
|
||||
|
||||
// zig_zag_decode converts a zig zag encoded unsigned integer into an signed
|
||||
// integer.
|
||||
#[allow(dead_code)]
|
||||
fn zig_zag_decode(v: u64) -> i64 {
|
||||
((v >> 1) ^ ((((v & 1) as i64) << 63) >> 63) as u64) as i64
|
||||
}
|
||||
|
@ -87,6 +90,7 @@ fn zig_zag_decode(v: u64) -> i64 {
|
|||
// i64_to_u64_vector converts a Vec<i64> to Vec<u64>.
|
||||
// TODO(edd): this is expensive as it copies. There are cheap
|
||||
// but unsafe alternatives to look into such as std::mem::transmute
|
||||
#[allow(dead_code)]
|
||||
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
|
||||
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>()
|
||||
}
|
||||
|
@ -94,6 +98,7 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
|
|||
// u64_to_i64_vector converts a Vec<u64> to Vec<i64>.
|
||||
// TODO(edd): this is expensive as it copies. There are cheap
|
||||
// but unsafe alternatives to look into such as std::mem::transmute
|
||||
#[allow(dead_code)]
|
||||
fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
|
||||
src.into_iter().map(|x| *x as i64).collect::<Vec<i64>>()
|
||||
}
|
||||
|
@ -103,6 +108,7 @@ fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
|
|||
// v should be the first element of a sequence, delta the difference that each
|
||||
// value in the sequence differs by, and count the total number of values in the
|
||||
// sequence.
|
||||
#[allow(dead_code)]
|
||||
fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
|
||||
let max_var_int_size = 10; // max number of bytes needed to store var int
|
||||
dst.push(0); // save a byte for encoding type
|
||||
|
@ -122,7 +128,8 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
|
|||
}
|
||||
|
||||
/// decode_all decodes a slice of bytes into a vector of signed integers.
|
||||
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -137,7 +144,8 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error
|
|||
}
|
||||
}
|
||||
|
||||
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() == 0 || src.len() & 0x7 != 0 {
|
||||
return Err(From::from("invalid uncompressed block length"));
|
||||
}
|
||||
|
@ -160,7 +168,8 @@ fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>>
|
|||
|
||||
// decode_rle decodes an RLE encoded slice containing only unsigned into the
|
||||
// destination vector.
|
||||
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() < 8 {
|
||||
return Err(From::from("not enough data to decode using RLE"));
|
||||
}
|
||||
|
@ -193,7 +202,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() < 9 {
|
||||
return Err(From::from("not enough data to decode packed timestamp"));
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ const NUM_BITS: [[u8; 2]; 14] = [
|
|||
|
||||
/// encode_all packs and binary encodes the provides slice of u64 values using
|
||||
/// simple8b into the provided vector.
|
||||
pub fn encode_all<'a>(src: &[u64], dst: &'a mut Vec<u8>) -> Result<(), Box<Error>> {
|
||||
pub fn encode_all<'a>(src: &[u64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
||||
let mut i = 0;
|
||||
'next_value: while i < src.len() {
|
||||
// try to pack a run of 240 or 120 1s
|
||||
|
@ -96,6 +96,7 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<u64>) {
|
|||
dst.truncate(j);
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn decode(v: u64, dst: &mut [u64]) -> usize {
|
||||
let sel = v >> S8B_BIT_SIZE as u64;
|
||||
let mut v = v;
|
||||
|
|
|
@ -3,6 +3,7 @@ use integer_encoding::*;
|
|||
use std::error::Error;
|
||||
|
||||
// Encoding describes the type of encoding used by an encoded timestamp block.
|
||||
#[allow(dead_code)]
|
||||
enum Encoding {
|
||||
Uncompressed = 0,
|
||||
Simple8b = 1,
|
||||
|
@ -16,7 +17,8 @@ enum Encoding {
|
|||
/// is potentially carried out. If all the deltas are the same the block can be
|
||||
/// encoded using RLE. If not, as long as the deltas are not bigger than simple8b::MAX_VALUE
|
||||
/// they can be encoded using simple8b.
|
||||
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
||||
dst.truncate(0); // reset buffer.
|
||||
if src.len() == 0 {
|
||||
return Ok(());
|
||||
|
@ -91,6 +93,7 @@ pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Bo
|
|||
// i64_to_u64_vector converts a Vec<i64> to Vec<u64>.
|
||||
// TODO(edd): this is expensive as it copies. There are cheap
|
||||
// but unsafe alternatives to look into such as std::mem::transmute
|
||||
#[allow(dead_code)]
|
||||
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
|
||||
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>()
|
||||
}
|
||||
|
@ -98,6 +101,7 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
|
|||
// u64_to_i64_vector converts a Vec<u64> to Vec<i64>.
|
||||
// TODO(edd): this is expensive as it copies. There are cheap
|
||||
// but unsafe alternatives to look into such as std::mem::transmute
|
||||
#[allow(dead_code)]
|
||||
fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
|
||||
src.into_iter().map(|x| *x as i64).collect::<Vec<i64>>()
|
||||
}
|
||||
|
@ -107,6 +111,7 @@ fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
|
|||
// v should be the first element of a sequence, delta the difference that each
|
||||
// value in the sequence differs by, and count the total number of values in the
|
||||
// sequence.
|
||||
#[allow(dead_code)]
|
||||
fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
|
||||
let max_var_int_size = 10; // max number of bytes needed to store var int
|
||||
|
||||
|
@ -150,7 +155,8 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
|
|||
|
||||
/// decode_all decodes a slice of bytes encoded using encode_all back into a
|
||||
/// vector of signed integers.
|
||||
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -166,7 +172,8 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error
|
|||
}
|
||||
|
||||
// decode_uncompressed writes the binary encoded values in src into dst.
|
||||
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() == 0 || src.len() & 0x7 != 0 {
|
||||
return Err(From::from("invalid uncompressed block length"));
|
||||
}
|
||||
|
@ -189,7 +196,8 @@ fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>>
|
|||
|
||||
// decode_rle decodes an RLE encoded slice containing only unsigned into the
|
||||
// destination vector.
|
||||
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() < 9 {
|
||||
return Err(From::from("not enough data to decode using RLE"));
|
||||
}
|
||||
|
@ -226,7 +234,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
|
||||
#[allow(dead_code)]
|
||||
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
||||
if src.len() < 9 {
|
||||
return Err(From::from("not enough data to decode packed timestamp"));
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use std::str::Chars;
|
||||
use std::{error, fmt};
|
||||
use std::fs::read;
|
||||
use actix_web::ResponseError;
|
||||
use actix_web::http::StatusCode;
|
||||
|
||||
|
@ -92,7 +91,7 @@ impl ResponseError for ParseError {
|
|||
// TODO: have parse return an error for invalid inputs
|
||||
pub fn parse(input: &str) -> Vec<Point> {
|
||||
let mut points = Vec::with_capacity(10000);
|
||||
let mut lines= input.lines();
|
||||
let lines= input.lines();
|
||||
|
||||
for line in lines {
|
||||
read_line(line, &mut points)
|
||||
|
|
19
src/main.rs
19
src/main.rs
|
@ -1,24 +1,19 @@
|
|||
use delorean::storage::rocksdb::Database;
|
||||
use delorean::{line_parser, storage};
|
||||
use delorean::storage::iterators::SeriesIterator;
|
||||
use delorean::storage::rocksdb::{PointsIterator, SeriesFilter, Range};
|
||||
use delorean::line_parser::{parse, index_pairs, Pair};
|
||||
use delorean::line_parser;
|
||||
use delorean::storage::rocksdb::{PointsIterator, Range};
|
||||
use delorean::line_parser::index_pairs;
|
||||
use delorean::storage::predicate::parse_predicate;
|
||||
use delorean::time::{parse_duration, time_as_i64_nanos};
|
||||
|
||||
use std::{env, io, str};
|
||||
use std::env::VarError;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error, Responder};
|
||||
use actix_web::web::Bytes;
|
||||
use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error};
|
||||
use serde_json;
|
||||
use serde::Deserialize;
|
||||
use serde::ser::{Serialize, Serializer, SerializeStruct};
|
||||
use actix_web::web::{BytesMut};
|
||||
use futures::{self, StreamExt, Stream};
|
||||
use futures::{self, StreamExt};
|
||||
use failure::_core::time::Duration;
|
||||
use csv::Writer;
|
||||
|
||||
|
@ -160,7 +155,7 @@ async fn read(read_info: web::Query<ReadInfo>, s: web::Data<Arc<Server>>) -> Res
|
|||
|
||||
let range = Range{start, stop};
|
||||
|
||||
let mut series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?;
|
||||
let series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?;
|
||||
|
||||
let bucket_id = series.bucket_id;
|
||||
let db = &s.db;
|
||||
|
@ -170,7 +165,7 @@ async fn read(read_info: web::Query<ReadInfo>, s: web::Data<Arc<Server>>) -> Res
|
|||
for s in series {
|
||||
let mut wtr = Writer::from_writer(vec![]);
|
||||
|
||||
let mut points = PointsIterator::new_from_series_filter(read_info.org_id, bucket_id, &db, &s, &range, 10)?;
|
||||
let points = PointsIterator::new_from_series_filter(read_info.org_id, bucket_id, &db, &s, &range, 10)?;
|
||||
let pairs = index_pairs(&s.key)?;
|
||||
let mut cols = Vec::with_capacity(pairs.len() + 2);
|
||||
let mut vals = Vec::with_capacity(pairs.len() + 2);
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
use crate::delorean::Predicate;
|
||||
use crate::storage::rocksdb::{Database, SeriesFilter, StorageError, Range, PointsIterator};
|
||||
|
||||
use rocksdb::{DB, IteratorMode, DBIterator};
|
||||
use crate::storage::rocksdb::SeriesFilter;
|
||||
|
||||
pub struct SeriesIterator {
|
||||
pub org_id: u32,
|
||||
|
|
|
@ -118,7 +118,7 @@ fn parse_value(chars: &mut Peekable<Chars>) -> Result<Value, StorageError> {
|
|||
val.push(ch);
|
||||
}
|
||||
},
|
||||
Some(ch) => return Err(StorageError{description: "unable to parse non-string values".to_string()}),
|
||||
Some(ch) => return Err(StorageError{description: format!("unable to parse non-string values like '{}'", ch)}),
|
||||
None => (),
|
||||
}
|
||||
|
||||
|
|
|
@ -1,19 +1,17 @@
|
|||
use crate::line_parser::{Point, Pair};
|
||||
use crate::line_parser::Point;
|
||||
use crate::storage::iterators::{ReadPoint, SeriesIterator};
|
||||
use crate::delorean::{Bucket, IndexLevel, Predicate, Node, node};
|
||||
use crate::delorean::{Bucket, IndexLevel, Predicate, Node};
|
||||
use crate::delorean::node::{Value, Comparison, Logical};
|
||||
|
||||
use bytes::BufMut;
|
||||
use std::{error, fmt};
|
||||
use std::sync::{Arc, RwLock, Mutex, MutexGuard};
|
||||
use std::sync::{Arc, RwLock, Mutex};
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
use std::io::Cursor;
|
||||
|
||||
use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily, DBIterator};
|
||||
use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, DBIterator};
|
||||
use byteorder::{ByteOrder, BigEndian, WriteBytesExt, ReadBytesExt};
|
||||
use prost::Message;
|
||||
use futures::AsyncWriteExt;
|
||||
use croaring::Treemap;
|
||||
use croaring::treemap::NativeSerializer;
|
||||
use actix_web::ResponseError;
|
||||
|
@ -43,7 +41,6 @@ pub struct Series {
|
|||
|
||||
const BUCKET_CF: &str = "buckets";
|
||||
const BUCKET_CF_WRITE_BUFFER_SIZE: usize = 1024 * 1024; // 1MB
|
||||
const INDEX_CF: &str = "indexes";
|
||||
const INDEX_CF_WRITE_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB
|
||||
|
||||
impl Database {
|
||||
|
@ -109,7 +106,7 @@ impl Database {
|
|||
let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time);
|
||||
let mut value = Vec::with_capacity(8);
|
||||
value.write_i64::<BigEndian>(s.point.value).unwrap();
|
||||
batch.put(key, value);
|
||||
batch.put(key, value).unwrap();
|
||||
}
|
||||
|
||||
self.db.read().unwrap().write(batch).expect("unexpected RocksDB error");
|
||||
|
@ -126,7 +123,7 @@ impl Database {
|
|||
self.create_bucket_if_not_exists(org_id, &bucket)
|
||||
}
|
||||
|
||||
pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result<SeriesIterator, StorageError> {
|
||||
pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, _batch_size: usize) -> Result<SeriesIterator, StorageError> {
|
||||
let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() {
|
||||
Some(b) => b,
|
||||
None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}),
|
||||
|
@ -138,9 +135,9 @@ impl Database {
|
|||
}
|
||||
|
||||
fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result<PointsIterator, StorageError> {
|
||||
let mut prefix = prefix_for_series(bucket_id, series_id, range.start);
|
||||
let prefix = prefix_for_series(bucket_id, series_id, range.start);
|
||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||
let mut iter = self.db.read().unwrap().iterator(mode);
|
||||
let iter = self.db.read().unwrap().iterator(mode);
|
||||
|
||||
Ok(PointsIterator::new(batch_size, iter, range.stop, prefix[0..12].to_vec()))
|
||||
}
|
||||
|
@ -176,7 +173,7 @@ impl Database {
|
|||
let mut store = bucket.clone();
|
||||
|
||||
// get the next bucket ID
|
||||
let mut next_id = match db.get_cf(buckets, next_bucket_id_key())
|
||||
let next_id = match db.get_cf(buckets, next_bucket_id_key())
|
||||
.expect("unexpected rocksdb error while trying to get the next bucket id") {
|
||||
|
||||
Some(val) => u32_from_bytes(&val),
|
||||
|
@ -189,7 +186,7 @@ impl Database {
|
|||
// write the bucket and the next ID counter atomically
|
||||
let mut batch = WriteBatch::default();
|
||||
batch.put_cf(&buckets, &key, buf).unwrap();
|
||||
batch.put_cf(&buckets, next_bucket_id_key(), u32_to_bytes(store.id + 1));
|
||||
batch.put_cf(&buckets, next_bucket_id_key(), u32_to_bytes(store.id + 1)).unwrap();
|
||||
db.write(batch).expect("unexpected rocksdb error writing to DB");
|
||||
|
||||
let id = store.id;
|
||||
|
@ -236,10 +233,10 @@ impl Database {
|
|||
///
|
||||
/// # Returns
|
||||
/// A vector of series where each point in the passed in vector is contained in a series
|
||||
pub fn get_series_ids(&self, org_id: u32, bucket: &Bucket, mut points: Vec<Point>) -> Vec<Series> {
|
||||
pub fn get_series_ids(&self, _org_id: u32, bucket: &Bucket, points: Vec<Point>) -> Vec<Series> {
|
||||
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
|
||||
let mut series = points.into_iter().map(|p| {
|
||||
let series = points.into_iter().map(|p| {
|
||||
let mut series = Series{id: None, point: p};
|
||||
let level = &bucket.index_levels[0];
|
||||
let cf_name = index_cf_name(bucket.id,level.duration_seconds, now);
|
||||
|
@ -259,7 +256,7 @@ impl Database {
|
|||
pub fn get_series_filters(&self, bucket: &Bucket, predicate: Option<&Predicate>, range: &Range) -> Result<Vec<SeriesFilter>, StorageError> {
|
||||
if let Some(pred) = predicate {
|
||||
if let Some(root) = &pred.root {
|
||||
let mut map = self.evaluate_node(bucket, &root, range)?;
|
||||
let map = self.evaluate_node(bucket, &root, range)?;
|
||||
let mut filters = Vec::with_capacity(map.cardinality() as usize);
|
||||
|
||||
for id in map.iter() {
|
||||
|
@ -370,7 +367,7 @@ impl Database {
|
|||
}
|
||||
|
||||
// TODO: handle predicate
|
||||
pub fn get_tag_keys(&self, bucket: &Bucket, _predicate: Option<&Predicate>, range: &Range) -> Vec<String> {
|
||||
pub fn get_tag_keys(&self, bucket: &Bucket, _predicate: Option<&Predicate>, _range: &Range) -> Vec<String> {
|
||||
let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range
|
||||
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now);
|
||||
|
@ -383,7 +380,7 @@ impl Database {
|
|||
Some(index) => {
|
||||
let prefix = index_tag_key_prefix(bucket.id);
|
||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||
let mut iter = db.iterator_cf(index, mode)
|
||||
let iter = db.iterator_cf(index, mode)
|
||||
.expect("unexpected rocksdb error getting iterator for index");
|
||||
|
||||
for (key, _) in iter {
|
||||
|
@ -401,7 +398,7 @@ impl Database {
|
|||
keys
|
||||
}
|
||||
|
||||
pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, range: &Range) -> Vec<String> {
|
||||
pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, _range: &Range) -> Vec<String> {
|
||||
let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range
|
||||
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now);
|
||||
|
@ -413,7 +410,7 @@ impl Database {
|
|||
Some(index) => {
|
||||
let prefix = index_tag_key_value_prefix(bucket.id, tag);
|
||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||
let mut iter = db.iterator_cf(index, mode)
|
||||
let iter = db.iterator_cf(index, mode)
|
||||
.expect("unexpected rocksdb error getting iterator for index");
|
||||
|
||||
for (key, _) in iter {
|
||||
|
@ -436,7 +433,7 @@ impl Database {
|
|||
fn ensure_series_mutex_exists(&self, bucket_id: u32) {
|
||||
let map = self.series_insert_lock.read().expect("mutex poisoned");
|
||||
|
||||
if let Some(next_id_mutex) = map.get(&bucket_id) {
|
||||
if let Some(_next_id_mutex) = map.get(&bucket_id) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -458,7 +455,7 @@ impl Database {
|
|||
// We want to get a lock on new series only for this bucket
|
||||
self.ensure_series_mutex_exists(bucket.id);
|
||||
let map = self.series_insert_lock.read().expect("mutex poisoned");
|
||||
let mut next_id = map.get(&bucket.id).expect("should exist because of call to ensure_series_mutex_exists");
|
||||
let next_id = map.get(&bucket.id).expect("should exist because of call to ensure_series_mutex_exists");
|
||||
let mut next_id = next_id.lock().expect("mutex poisoned");
|
||||
|
||||
let mut batch = WriteBatch::default();
|
||||
|
@ -504,8 +501,8 @@ impl Database {
|
|||
let id = *next_id;
|
||||
let mut series_id = Vec::with_capacity(8);
|
||||
series_id.write_u64::<BigEndian>(*next_id).unwrap();
|
||||
batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id.clone());
|
||||
batch.put_cf(index_cf, index_series_id(&series_id), &series.point.series.as_bytes());
|
||||
batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id.clone()).unwrap();
|
||||
batch.put_cf(index_cf, index_series_id(&series_id), &series.point.series.as_bytes()).unwrap();
|
||||
series_id_map.insert(series.point.series.clone(), *next_id);
|
||||
*next_id += 1;
|
||||
|
||||
|
@ -516,10 +513,10 @@ impl Database {
|
|||
let pairs = series.point.index_pairs().unwrap();
|
||||
for pair in pairs {
|
||||
// insert the tag key index
|
||||
batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]);
|
||||
batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]).unwrap();
|
||||
|
||||
// insert the tag value index
|
||||
batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]);
|
||||
batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]).unwrap();
|
||||
|
||||
// update the key to id bitmap
|
||||
let index_key_posting_list_key = index_key_posting_list(bucket.id, &pair.key).to_vec();
|
||||
|
@ -564,14 +561,14 @@ impl Database {
|
|||
|
||||
// do the index writes from the in temporary in memory map
|
||||
for (k, v) in index_map.iter() {
|
||||
batch.put_cf(index_cf, k, v.serialize().unwrap());
|
||||
let _ = batch.put_cf(index_cf, k, v.serialize().unwrap());
|
||||
}
|
||||
|
||||
// save the next series id
|
||||
let bucket_cf = db.cf_handle(BUCKET_CF).unwrap();
|
||||
let mut next_series_id_val = Vec::with_capacity(8);
|
||||
next_series_id_val.write_u64::<BigEndian>(*next_id).unwrap();
|
||||
batch.put_cf(bucket_cf, next_series_id_key(org_id, bucket.id), next_series_id_val);
|
||||
let _ = batch.put_cf(bucket_cf, next_series_id_key(org_id, bucket.id), next_series_id_val);
|
||||
|
||||
db.write(batch).expect("unexpected rocksdb error");
|
||||
}
|
||||
|
@ -595,7 +592,7 @@ impl Database {
|
|||
|
||||
let buckets = db.cf_handle(BUCKET_CF).unwrap();
|
||||
let prefix = &[BucketEntryType::Bucket as u8];
|
||||
let mut iter = db.iterator_cf(&buckets, IteratorMode::From(prefix, Direction::Forward)).unwrap();
|
||||
let iter = db.iterator_cf(&buckets, IteratorMode::From(prefix, Direction::Forward)).unwrap();
|
||||
|
||||
let mut id_mutex_map = HashMap::new();
|
||||
let mut bucket_map = self.bucket_map.write().unwrap();
|
||||
|
@ -653,14 +650,18 @@ TODO: The index todo list
|
|||
8. index levels
|
||||
|
||||
TODO: other pieces
|
||||
2. HTTP GET endpoint with predicate and time ranges
|
||||
3. API endpoint to delete old series data
|
||||
4. API endpoint to delete old indexes
|
||||
5. API endpoint to run tsm compaction
|
||||
6. Write/read other data types
|
||||
- API endpoint to delete old series data
|
||||
- API endpoint to delete old indexes
|
||||
- API endpoint to run tsm compaction
|
||||
- Write/read other data types
|
||||
- Buckets backed by alternate storage
|
||||
- Meta store abstracted from RocksDB
|
||||
- Index abstracted to Trait
|
||||
- Raw data iterator abstracted to Trait
|
||||
|
||||
*/
|
||||
|
||||
#[allow(dead_code)]
|
||||
enum SeriesDataType {
|
||||
Int64,
|
||||
Float64,
|
||||
|
@ -854,10 +855,6 @@ fn index_key_value_posting_list(bucket_id: u32, key: &str, value: &str) -> Vec<u
|
|||
v
|
||||
}
|
||||
|
||||
fn index_entry_type_from_byte(b: u8) -> IndexEntryType {
|
||||
unsafe { ::std::mem::transmute(b) }
|
||||
}
|
||||
|
||||
// next_series_id_key gives the key in the buckets CF in rocks that holds the value for the next series ID
|
||||
fn next_series_id_key(org_id: u32, bucket_id: u32) -> Vec<u8> {
|
||||
let mut v = Vec::with_capacity(9);
|
||||
|
@ -912,12 +909,6 @@ fn u32_to_bytes(val: u32) -> Vec<u8> {
|
|||
v
|
||||
}
|
||||
|
||||
fn u64_to_bytes(val: u64) -> Vec<u8> {
|
||||
let mut v = Vec::with_capacity(8);
|
||||
v.write_u64::<BigEndian>(val).unwrap();
|
||||
v
|
||||
}
|
||||
|
||||
impl Bucket {
|
||||
pub fn new(org_id: u32, name: String) -> Bucket {
|
||||
Bucket{
|
||||
|
@ -983,22 +974,16 @@ mod tests {
|
|||
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
use serde_json::error::Category::Data;
|
||||
|
||||
use rocksdb;
|
||||
use crate::storage::predicate::parse_predicate;
|
||||
use crate::storage::rocksdb::IndexEntryType::SeriesKeyToID;
|
||||
use crate::line_parser::parse;
|
||||
use crate::storage::iterators::ReadSeries;
|
||||
use std::net::Shutdown::Read;
|
||||
|
||||
#[test]
|
||||
fn create_and_get_buckets() {
|
||||
let mut bucket: Bucket;
|
||||
let bucket: Bucket;
|
||||
let org_id = 1;
|
||||
let mut bucket2 = Bucket::new(2, "Foo".to_string());
|
||||
{
|
||||
let mut db = test_database("create_and_get_buckets", true);
|
||||
let db = test_database("create_and_get_buckets", true);
|
||||
let mut b = Bucket::new(org_id, "Foo".to_string());
|
||||
|
||||
b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap();
|
||||
|
@ -1029,7 +1014,7 @@ mod tests {
|
|||
|
||||
// ensure it persists across database reload
|
||||
{
|
||||
let mut db = test_database("create_and_get_buckets", false);
|
||||
let db = test_database("create_and_get_buckets", false);
|
||||
let stored_bucket = db.get_bucket_by_name(org_id, &bucket.name).unwrap().unwrap();
|
||||
assert_eq!(bucket, stored_bucket);
|
||||
|
||||
|
@ -1051,7 +1036,7 @@ mod tests {
|
|||
let p4 = Point{series: "four".to_string(), value: 234, time: 100};
|
||||
|
||||
{
|
||||
let mut db = test_database("series_id_indexing", true);
|
||||
let db = test_database("series_id_indexing", true);
|
||||
b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap();
|
||||
b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap();
|
||||
|
||||
|
@ -1090,7 +1075,7 @@ mod tests {
|
|||
|
||||
// now make sure that a new series gets inserted properly after restart
|
||||
{
|
||||
let mut db = test_database("series_id_indexing", false);
|
||||
let db = test_database("series_id_indexing", false);
|
||||
|
||||
// check the first org
|
||||
let mut series = vec![Series{id: None, point: p4.clone()}];
|
||||
|
@ -1124,7 +1109,7 @@ mod tests {
|
|||
#[test]
|
||||
fn series_metadata_indexing() {
|
||||
let mut bucket = Bucket::new(1, "foo".to_string());
|
||||
let mut db = test_database("series_metadata_indexing", true);
|
||||
let db = test_database("series_metadata_indexing", true);
|
||||
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0};
|
||||
let p2 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0};
|
||||
let p3 = Point{series: "cpu,host=a,region=west\tusage_user".to_string(), value: 1, time: 0};
|
||||
|
@ -1178,8 +1163,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn write_creates_bucket() {
|
||||
let mut b1 = Bucket::new(1, "bucket1".to_string());
|
||||
let mut db = test_database("write_creates_bucket", true);
|
||||
let b1 = Bucket::new(1, "bucket1".to_string());
|
||||
let db = test_database("write_creates_bucket", true);
|
||||
|
||||
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1};
|
||||
let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2};
|
||||
|
@ -1191,7 +1176,7 @@ mod tests {
|
|||
#[test]
|
||||
fn catch_rocksdb_iterator_segfault() {
|
||||
let mut b1 = Bucket::new(1, "bucket1".to_string());
|
||||
let mut db = test_database("catch_rocksdb_iterator_segfault", true);
|
||||
let db = test_database("catch_rocksdb_iterator_segfault", true);
|
||||
|
||||
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1};
|
||||
|
||||
|
@ -1218,7 +1203,7 @@ mod tests {
|
|||
fn write_and_read_points() {
|
||||
let mut b1 = Bucket::new(1, "bucket1".to_string());
|
||||
let mut b2 = Bucket::new(2, "bucket2".to_string());
|
||||
let mut db = test_database("write_and_read_points", true);
|
||||
let db = test_database("write_and_read_points", true);
|
||||
|
||||
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1};
|
||||
let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2};
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
use crate::Error;
|
||||
|
||||
use std::str::Chars;
|
||||
use std::iter::Peekable;
|
||||
use std::time::{Duration, UNIX_EPOCH};
|
||||
use std::time::SystemTime;
|
||||
|
||||
|
@ -30,6 +28,7 @@ impl RelativeDuration {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: update this so that we're not indexing into the string. Should be iterating with chars
|
||||
pub fn parse_duration(s: &str) -> Result<RelativeDuration, Error> {
|
||||
if s.len() < 2 {
|
||||
return Err(Error{description: "duration must have at least two characters".to_string()})
|
||||
|
|
Loading…
Reference in New Issue