feat: support org and bucket ID in entries

pull/24376/head
Edd Robinson 2020-05-28 16:11:10 +01:00
parent 97e77a681b
commit 413738a264
5 changed files with 193 additions and 6 deletions

43
Cargo.lock generated
View File

@ -1,5 +1,10 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "adler32"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "aho-corasick"
version = "0.7.10"
@ -200,6 +205,16 @@ name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "chrono"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "clang-sys"
version = "0.28.1"
@ -374,6 +389,7 @@ dependencies = [
"assert_cmd 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"criterion 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"croaring 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -390,6 +406,7 @@ dependencies = [
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)",
"integer-encoding 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"libflate 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"num 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -893,6 +910,22 @@ name = "libc"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "libflate"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libflate_lz77 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rle-decode-fast 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "libflate_lz77"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "libloading"
version = "0.5.2"
@ -1537,6 +1570,11 @@ dependencies = [
"winreg 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rle-decode-fast"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "rust-argon2"
version = "0.7.0"
@ -2364,6 +2402,7 @@ dependencies = [
]
[metadata]
"checksum adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
"checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum anyhow 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)" = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f"
@ -2390,6 +2429,7 @@ dependencies = [
"checksum cc 1.0.54 (registry+https://github.com/rust-lang/crates.io-index)" = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311"
"checksum cexpr 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "fce5b5fb86b0c57c20c834c1b412fd09c77c8a59b9473f86272709e78874cd1d"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2"
"checksum clang-sys 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)" = "81de550971c976f176130da4b2978d3b524eaa0fd9ac31f3ceb5ae1231fb4853"
"checksum clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
"checksum constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
@ -2456,6 +2496,8 @@ dependencies = [
"checksum lazycell 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f"
"checksum lexical-core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d7043aa5c05dd34fb73b47acb8c3708eac428de4545ea3682ed2f11293ebd890"
"checksum libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)" = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49"
"checksum libflate 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a1fbe6b967a94346446d37ace319ae85be7eca261bb8149325811ac435d35d64"
"checksum libflate_lz77 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3286f09f7d4926fc486334f28d8d2e6ebe4f7f9994494b6dab27ddfad2c9b11b"
"checksum libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
@ -2527,6 +2569,7 @@ dependencies = [
"checksum regex-syntax 0.6.18 (registry+https://github.com/rust-lang/crates.io-index)" = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
"checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e"
"checksum reqwest 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)" = "4be79e8610ce0c2d646aa03bff67653f76d394c4e0a5e7bc255007008fcc6ba8"
"checksum rle-decode-fast 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac"
"checksum rust-argon2 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017"
"checksum rustc-hash 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"

View File

@ -62,12 +62,15 @@ either = "1.5.3"
flatbuffers = "0.6.1"
[dev-dependencies]
delorean_test_helpers = { path = "delorean_test_helpers" }
criterion = "0.3"
assert_cmd = "1.0.0"
tempfile = "3.1.0"
chrono = "0.4"
criterion = "0.3"
delorean_test_helpers = { path = "delorean_test_helpers" }
libflate = "1.0.0"
rand = "0.7.2"
reqwest = { version = "0.10.1", features = ["blocking"] }
predicates = "1.0.4"
tempfile = "3.1.0"
[[bench]]
name = "encoders"

View File

@ -50,3 +50,49 @@ impl Encoder for Vec<bool> {
})
}
}
/// Types implementing `Decoder` are able to decode themselves from compressed
/// blocks of vectors of values.
pub trait Decoder<T> {
fn decode(dst: &mut Vec<T>, src: Vec<u8>) -> Result<(), StorageError>;
}
impl Decoder<f64> for Vec<f64> {
fn decode(dst: &mut Vec<f64>, src: Vec<u8>) -> Result<(), StorageError> {
float::decode(&src, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Decoder<i64> for Vec<i64> {
fn decode(dst: &mut Vec<i64>, src: Vec<u8>) -> Result<(), StorageError> {
integer::decode(&src, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Decoder<u64> for Vec<u64> {
fn decode(_: &mut Vec<u64>, _: Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Decoder<&str> for Vec<&str> {
fn decode(_: &mut Vec<&str>, _: Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Decoder<bool> for Vec<bool> {
fn decode(_: &mut Vec<bool>, _: Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}

View File

@ -2,6 +2,7 @@ use crate::storage::StorageError;
use std::io::BufRead;
use std::io::{Read, Seek, SeekFrom};
use std::u64;
pub struct TSMFile<R>
where
@ -74,8 +75,18 @@ impl<T: Read> Index<T> {
self.curr_offset += 2;
let count = u16::from_be_bytes(buf);
let mut buf2: [u8; 8] = [0; 8];
buf2.copy_from_slice(&key_bytes[..8]);
let org_id = InfluxID::from_be_bytes(buf2);
buf2.copy_from_slice(&key_bytes[8..16]);
let bucket_id = InfluxID::from_be_bytes(buf2);
Ok(IndexEntry {
key: key_bytes,
org_id,
bucket_id,
block_type,
count,
curr_block: 1,
@ -157,9 +168,42 @@ impl<T: Read> Iterator for Index<T> {
}
}
#[derive(Clone, Debug)]
/// InfluxID represents an InfluxDB ID used in InfluxDB 2.x to represent
/// organization and bucket identifiers.
pub struct InfluxID(u64);
#[allow(dead_code)]
impl InfluxID {
fn new_str(s: &str) -> Result<InfluxID, StorageError> {
let v = u64::from_str_radix(s, 16).map_err(|e| StorageError {
description: e.to_string(),
})?;
Ok(InfluxID(v))
}
fn from_be_bytes(bytes: [u8; 8]) -> InfluxID {
InfluxID(u64::from_be_bytes(bytes))
}
}
impl std::fmt::Display for InfluxID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(f, "{:016x}", self.0)
}
}
impl std::cmp::PartialEq for InfluxID {
fn eq(&self, r: &InfluxID) -> bool {
self.0 == r.0
}
}
#[derive(Clone)]
pub struct IndexEntry {
key: Vec<u8>,
org_id: InfluxID,
bucket_id: InfluxID,
block_type: u8,
count: u16,
@ -176,17 +220,17 @@ pub struct Block {
}
#[cfg(test)]
mod test {
mod tests {
use super::*;
use libflate::gzip::Decoder;
use std::fs::File;
use std::i64;
use std::io::BufReader;
use std::io::Cursor;
#[test]
fn read_tsm_index() {
let file =
File::open("/Users/edd/rust/delorean/src/storage/000000000000005-000000002.tsm.gz");
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
@ -197,4 +241,55 @@ mod test {
assert_eq!(index.curr_offset, 3_893_272);
assert_eq!(index.count(), 2159)
}
#[test]
fn read_tsm_block() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let mut reader = TSMFile::new(BufReader::new(Cursor::new(buf)), 4_222_248);
let index = reader.index().unwrap();
let mut got_blocks: u64 = 0;
let mut got_min_time = i64::MAX;
let mut got_max_time = i64::MIN;
// every block in the fixture file is for the 05c19117091a1000 org and
// 05c19117091a1001 bucket.
let org_id = InfluxID::new_str("05c19117091a1000").unwrap();
let bucket_id = InfluxID::new_str("05c19117091a1001").unwrap();
for index_entry in index {
match index_entry {
Ok(entry) => {
got_blocks += entry.count as u64;
if entry.block.min_time < got_min_time {
got_min_time = entry.block.min_time;
}
if entry.block.max_time > got_max_time {
got_max_time = entry.block.max_time;
}
assert_eq!(entry.org_id, org_id);
assert_eq!(entry.bucket_id, bucket_id);
}
Err(e) => panic!("{:?} {:?}", e, got_blocks),
}
}
assert_eq!(got_blocks, 2159); // 2,159 blocks in the file
assert_eq!(got_min_time, 1590585404546128000); // earliest time is 2020-05-27T13:16:44.546128Z
assert_eq!(got_max_time, 1590597378379824000); // latest time is 2020-05-27T16:36:18.379824Z
}
#[test]
fn influx_id() {
let id = InfluxID::new_str("20aa9b0").unwrap();
assert_eq!(id, InfluxID(34253232));
assert_eq!(format!("{}", id), "00000000020aa9b0");
}
}