diff --git a/Cargo.lock b/Cargo.lock index a5881d04f2..3ce7484d1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,10 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "adler32" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "aho-corasick" version = "0.7.10" @@ -200,6 +205,16 @@ name = "cfg-if" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "chrono" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "clang-sys" version = "0.28.1" @@ -374,6 +389,7 @@ dependencies = [ "assert_cmd 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "croaring 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -390,6 +406,7 @@ dependencies = [ "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)", "integer-encoding 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "libflate 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "num 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -893,6 +910,22 @@ name = "libc" version = "0.2.71" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "libflate" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libflate_lz77 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rle-decode-fast 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "libflate_lz77" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libloading" version = "0.5.2" @@ -1537,6 +1570,11 @@ dependencies = [ "winreg 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "rust-argon2" version = "0.7.0" @@ -2364,6 +2402,7 @@ dependencies = [ ] [metadata] +"checksum adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" "checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum anyhow 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)" = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" @@ -2390,6 +2429,7 @@ dependencies = [ "checksum cc 1.0.54 (registry+https://github.com/rust-lang/crates.io-index)" = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" "checksum cexpr 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "fce5b5fb86b0c57c20c834c1b412fd09c77c8a59b9473f86272709e78874cd1d" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +"checksum chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" "checksum clang-sys 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)" = "81de550971c976f176130da4b2978d3b524eaa0fd9ac31f3ceb5ae1231fb4853" "checksum clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" "checksum constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" @@ -2456,6 +2496,8 @@ dependencies = [ "checksum lazycell 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" "checksum lexical-core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d7043aa5c05dd34fb73b47acb8c3708eac428de4545ea3682ed2f11293ebd890" "checksum libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)" = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" +"checksum libflate 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a1fbe6b967a94346446d37ace319ae85be7eca261bb8149325811ac435d35d64" +"checksum libflate_lz77 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3286f09f7d4926fc486334f28d8d2e6ebe4f7f9994494b6dab27ddfad2c9b11b" "checksum libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" @@ -2527,6 +2569,7 @@ dependencies = [ "checksum regex-syntax 0.6.18 (registry+https://github.com/rust-lang/crates.io-index)" = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" "checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" "checksum reqwest 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)" = "4be79e8610ce0c2d646aa03bff67653f76d394c4e0a5e7bc255007008fcc6ba8" +"checksum rle-decode-fast 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" "checksum rust-argon2 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017" "checksum rustc-hash 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" diff --git a/Cargo.toml b/Cargo.toml index 9317823b62..8d1622a551 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,12 +62,15 @@ either = "1.5.3" flatbuffers = "0.6.1" [dev-dependencies] -delorean_test_helpers = { path = "delorean_test_helpers" } -criterion = "0.3" assert_cmd = "1.0.0" -tempfile = "3.1.0" +chrono = "0.4" +criterion = "0.3" +delorean_test_helpers = { path = "delorean_test_helpers" } +libflate = "1.0.0" rand = "0.7.2" reqwest = { version = "0.10.1", features = ["blocking"] } +predicates = "1.0.4" +tempfile = "3.1.0" [[bench]] name = "encoders" diff --git a/src/encoders.rs b/src/encoders.rs index 7cf40f8271..86433a6b14 100644 --- a/src/encoders.rs +++ b/src/encoders.rs @@ -50,3 +50,49 @@ impl Encoder for Vec { }) } } + +/// Types implementing `Decoder` are able to decode themselves from compressed +/// blocks of vectors of values. +pub trait Decoder { + fn decode(dst: &mut Vec, src: Vec) -> Result<(), StorageError>; +} + +impl Decoder for Vec { + fn decode(dst: &mut Vec, src: Vec) -> Result<(), StorageError> { + float::decode(&src, dst).map_err(|e| StorageError { + description: e.to_string(), + }) + } +} + +impl Decoder for Vec { + fn decode(dst: &mut Vec, src: Vec) -> Result<(), StorageError> { + integer::decode(&src, dst).map_err(|e| StorageError { + description: e.to_string(), + }) + } +} + +impl Decoder for Vec { + fn decode(_: &mut Vec, _: Vec) -> Result<(), StorageError> { + Err(StorageError { + description: String::from("not yet implemented"), + }) + } +} + +impl Decoder<&str> for Vec<&str> { + fn decode(_: &mut Vec<&str>, _: Vec) -> Result<(), StorageError> { + Err(StorageError { + description: String::from("not yet implemented"), + }) + } +} + +impl Decoder for Vec { + fn decode(_: &mut Vec, _: Vec) -> Result<(), StorageError> { + Err(StorageError { + description: String::from("not yet implemented"), + }) + } +} diff --git a/src/storage/tsm.rs b/src/storage/tsm.rs index c3b9708b83..0cb7ffb46c 100644 --- a/src/storage/tsm.rs +++ b/src/storage/tsm.rs @@ -2,6 +2,7 @@ use crate::storage::StorageError; use std::io::BufRead; use std::io::{Read, Seek, SeekFrom}; +use std::u64; pub struct TSMFile where @@ -74,8 +75,18 @@ impl Index { self.curr_offset += 2; let count = u16::from_be_bytes(buf); + let mut buf2: [u8; 8] = [0; 8]; + + buf2.copy_from_slice(&key_bytes[..8]); + let org_id = InfluxID::from_be_bytes(buf2); + + buf2.copy_from_slice(&key_bytes[8..16]); + let bucket_id = InfluxID::from_be_bytes(buf2); + Ok(IndexEntry { key: key_bytes, + org_id, + bucket_id, block_type, count, curr_block: 1, @@ -157,9 +168,42 @@ impl Iterator for Index { } } +#[derive(Clone, Debug)] +/// InfluxID represents an InfluxDB ID used in InfluxDB 2.x to represent +/// organization and bucket identifiers. +pub struct InfluxID(u64); + +#[allow(dead_code)] +impl InfluxID { + fn new_str(s: &str) -> Result { + let v = u64::from_str_radix(s, 16).map_err(|e| StorageError { + description: e.to_string(), + })?; + Ok(InfluxID(v)) + } + + fn from_be_bytes(bytes: [u8; 8]) -> InfluxID { + InfluxID(u64::from_be_bytes(bytes)) + } +} + +impl std::fmt::Display for InfluxID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!(f, "{:016x}", self.0) + } +} + +impl std::cmp::PartialEq for InfluxID { + fn eq(&self, r: &InfluxID) -> bool { + self.0 == r.0 + } +} + #[derive(Clone)] pub struct IndexEntry { key: Vec, + org_id: InfluxID, + bucket_id: InfluxID, block_type: u8, count: u16, @@ -176,17 +220,17 @@ pub struct Block { } #[cfg(test)] -mod test { +mod tests { use super::*; use libflate::gzip::Decoder; use std::fs::File; + use std::i64; use std::io::BufReader; use std::io::Cursor; #[test] fn read_tsm_index() { - let file = - File::open("/Users/edd/rust/delorean/src/storage/000000000000005-000000002.tsm.gz"); + let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz"); let mut decoder = Decoder::new(file.unwrap()).unwrap(); let mut buf = Vec::new(); decoder.read_to_end(&mut buf).unwrap(); @@ -197,4 +241,55 @@ mod test { assert_eq!(index.curr_offset, 3_893_272); assert_eq!(index.count(), 2159) } + + #[test] + fn read_tsm_block() { + let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz"); + let mut decoder = Decoder::new(file.unwrap()).unwrap(); + let mut buf = Vec::new(); + decoder.read_to_end(&mut buf).unwrap(); + + let mut reader = TSMFile::new(BufReader::new(Cursor::new(buf)), 4_222_248); + let index = reader.index().unwrap(); + + let mut got_blocks: u64 = 0; + let mut got_min_time = i64::MAX; + let mut got_max_time = i64::MIN; + + // every block in the fixture file is for the 05c19117091a1000 org and + // 05c19117091a1001 bucket. + let org_id = InfluxID::new_str("05c19117091a1000").unwrap(); + let bucket_id = InfluxID::new_str("05c19117091a1001").unwrap(); + + for index_entry in index { + match index_entry { + Ok(entry) => { + got_blocks += entry.count as u64; + + if entry.block.min_time < got_min_time { + got_min_time = entry.block.min_time; + } + + if entry.block.max_time > got_max_time { + got_max_time = entry.block.max_time; + } + + assert_eq!(entry.org_id, org_id); + assert_eq!(entry.bucket_id, bucket_id); + } + Err(e) => panic!("{:?} {:?}", e, got_blocks), + } + } + + assert_eq!(got_blocks, 2159); // 2,159 blocks in the file + assert_eq!(got_min_time, 1590585404546128000); // earliest time is 2020-05-27T13:16:44.546128Z + assert_eq!(got_max_time, 1590597378379824000); // latest time is 2020-05-27T16:36:18.379824Z + } + + #[test] + fn influx_id() { + let id = InfluxID::new_str("20aa9b0").unwrap(); + assert_eq!(id, InfluxID(34253232)); + assert_eq!(format!("{}", id), "00000000020aa9b0"); + } } diff --git a/src/storage/000000000000005-000000002.tsm.gz b/tests/fixtures/000000000000005-000000002.tsm.gz similarity index 100% rename from src/storage/000000000000005-000000002.tsm.gz rename to tests/fixtures/000000000000005-000000002.tsm.gz