refactor: rename all crates that start with`delorean_` in preparation for rename (#415)

* refactor: rename delorean_cluster --> cluster

* refactor: rebane delorean_generated_types --> generated_types

* refactor: rename delorean_write_buffer --> write_buffer

* refactor: rename delorean_ingest --> ingest

* refactor: rename delorean_storage --> storage

* refactor: rename delorean_tsm --> tsm

* refactor: rename delorean_test_helpers --> test_helpers

* refactor: rename delorean_arrow --> arrow_deps

* refactor: rename delorean_line_parser --> influxdb_line_protocol
pull/24376/head
Andrew Lamb 2020-11-05 13:44:36 -05:00 committed by GitHub
parent 9df6c24493
commit a52e0001c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
104 changed files with 481 additions and 487 deletions

296
Cargo.lock generated
View File

@ -96,6 +96,15 @@ dependencies = [
"serde_json",
]
[[package]]
name = "arrow_deps"
version = "0.1.0"
dependencies = [
"arrow",
"datafusion",
"parquet",
]
[[package]]
name = "assert-json-diff"
version = "1.1.0"
@ -404,6 +413,26 @@ dependencies = [
"serde_json",
]
[[package]]
name = "cluster"
version = "0.1.0"
dependencies = [
"arrow_deps",
"async-trait",
"bytes",
"data_types",
"futures",
"generated_types",
"influxdb_line_protocol",
"object_store",
"serde",
"serde_json",
"snafu",
"storage",
"tokio",
"write_buffer",
]
[[package]]
name = "colored"
version = "1.9.3"
@ -667,9 +696,9 @@ version = "0.1.0"
dependencies = [
"chrono",
"crc32fast",
"delorean_generated_types",
"delorean_line_parser",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"serde",
"snafu",
"tracing",
@ -699,6 +728,7 @@ dependencies = [
name = "delorean"
version = "0.1.0"
dependencies = [
"arrow_deps",
"assert_cmd",
"byteorder",
"bytes",
@ -706,23 +736,18 @@ dependencies = [
"criterion",
"csv",
"data_types",
"delorean_arrow",
"delorean_generated_types",
"delorean_ingest",
"delorean_line_parser",
"delorean_segment_store",
"delorean_storage",
"delorean_test_helpers",
"delorean_tsm",
"delorean_write_buffer",
"dirs 3.0.1",
"dotenv",
"env_logger",
"futures",
"generated_types",
"hex",
"http",
"hyper",
"influxdb2_client",
"influxdb_line_protocol",
"ingest",
"libflate",
"mem_qe",
"object_store",
@ -736,157 +761,29 @@ dependencies = [
"serde_json",
"serde_urlencoded 0.7.0",
"snafu",
"storage",
"tempfile",
"test_helpers",
"tokio",
"tonic",
"tracing",
"tracing-futures",
"tsm",
"wal",
]
[[package]]
name = "delorean_arrow"
version = "0.1.0"
dependencies = [
"arrow",
"datafusion",
"parquet",
]
[[package]]
name = "delorean_cluster"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"data_types",
"delorean_arrow",
"delorean_generated_types",
"delorean_line_parser",
"delorean_storage",
"delorean_write_buffer",
"futures",
"object_store",
"serde",
"serde_json",
"snafu",
"tokio",
]
[[package]]
name = "delorean_generated_types"
version = "0.1.0"
dependencies = [
"flatbuffers",
"futures",
"prost",
"prost-types",
"tonic",
"tonic-build",
]
[[package]]
name = "delorean_ingest"
version = "0.1.0"
dependencies = [
"data_types",
"delorean_arrow",
"delorean_line_parser",
"delorean_test_helpers",
"delorean_tsm",
"env_logger",
"libflate",
"packers",
"snafu",
"tracing",
]
[[package]]
name = "delorean_line_parser"
version = "0.1.0"
dependencies = [
"delorean_test_helpers",
"influxdb2_client",
"nom",
"smallvec",
"snafu",
"tracing",
"write_buffer",
]
[[package]]
name = "delorean_segment_store"
version = "0.1.0"
dependencies = [
"arrow_deps",
"criterion",
"croaring",
"delorean_arrow",
"packers",
"rand",
]
[[package]]
name = "delorean_storage"
version = "0.1.0"
dependencies = [
"async-trait",
"croaring",
"data_types",
"delorean_arrow",
"delorean_line_parser",
"delorean_test_helpers",
"serde",
"serde_urlencoded 0.6.1",
"snafu",
"tokio",
"tracing",
]
[[package]]
name = "delorean_test_helpers"
version = "0.1.0"
dependencies = [
"dotenv",
"env_logger",
"tempfile",
"tracing",
]
[[package]]
name = "delorean_tsm"
version = "0.1.0"
dependencies = [
"delorean_test_helpers",
"hex",
"integer-encoding",
"libflate",
"rand",
"snafu",
"snap",
"tracing",
]
[[package]]
name = "delorean_write_buffer"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"criterion",
"data_types",
"delorean_arrow",
"delorean_generated_types",
"delorean_line_parser",
"delorean_storage",
"delorean_test_helpers",
"flatbuffers",
"snafu",
"sqlparser",
"string-interner",
"tokio",
"tracing",
"wal",
]
[[package]]
name = "difference"
version = "2.0.0"
@ -1218,6 +1115,18 @@ dependencies = [
"slab",
]
[[package]]
name = "generated_types"
version = "0.1.0"
dependencies = [
"flatbuffers",
"futures",
"prost",
"prost-types",
"tonic",
"tonic-build",
]
[[package]]
name = "generic-array"
version = "0.12.3"
@ -1440,6 +1349,34 @@ dependencies = [
"tokio",
]
[[package]]
name = "influxdb_line_protocol"
version = "0.1.0"
dependencies = [
"influxdb2_client",
"nom",
"smallvec",
"snafu",
"test_helpers",
"tracing",
]
[[package]]
name = "ingest"
version = "0.1.0"
dependencies = [
"arrow_deps",
"data_types",
"env_logger",
"influxdb_line_protocol",
"libflate",
"packers",
"snafu",
"test_helpers",
"tracing",
"tsm",
]
[[package]]
name = "integer-encoding"
version = "1.1.5"
@ -1643,11 +1580,11 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
name = "mem_qe"
version = "0.1.0"
dependencies = [
"arrow_deps",
"chrono",
"criterion",
"croaring",
"crossbeam",
"delorean_arrow",
"env_logger",
"human_format",
"packers",
@ -2030,14 +1967,14 @@ dependencies = [
name = "packers"
version = "0.1.0"
dependencies = [
"arrow_deps",
"data_types",
"delorean_arrow",
"delorean_test_helpers",
"delorean_tsm",
"human_format",
"rand",
"snafu",
"test_helpers",
"tracing",
"tsm",
]
[[package]]
@ -2981,6 +2918,23 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "storage"
version = "0.1.0"
dependencies = [
"arrow_deps",
"async-trait",
"croaring",
"data_types",
"influxdb_line_protocol",
"serde",
"serde_urlencoded 0.6.1",
"snafu",
"test_helpers",
"tokio",
"tracing",
]
[[package]]
name = "string-interner"
version = "0.12.0"
@ -3049,6 +3003,16 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "test_helpers"
version = "0.1.0"
dependencies = [
"dotenv",
"env_logger",
"tempfile",
"tracing",
]
[[package]]
name = "textwrap"
version = "0.11.0"
@ -3488,6 +3452,20 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tsm"
version = "0.1.0"
dependencies = [
"hex",
"integer-encoding",
"libflate",
"rand",
"snafu",
"snap",
"test_helpers",
"tracing",
]
[[package]]
name = "typenum"
version = "1.12.0"
@ -3595,7 +3573,6 @@ version = "0.1.0"
dependencies = [
"byteorder",
"crc32fast",
"delorean_test_helpers",
"futures",
"itertools 0.9.0",
"once_cell",
@ -3604,6 +3581,7 @@ dependencies = [
"serde_json",
"snafu",
"snap",
"test_helpers",
"tokio",
"tracing",
]
@ -3780,6 +3758,28 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "write_buffer"
version = "0.1.0"
dependencies = [
"arrow_deps",
"async-trait",
"chrono",
"criterion",
"data_types",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"snafu",
"sqlparser",
"storage",
"string-interner",
"test_helpers",
"tokio",
"tracing",
"wal",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"

View File

@ -7,21 +7,21 @@ default-run = "delorean"
[workspace]
members = [
"delorean_arrow",
"delorean_cluster",
"arrow_deps",
"cluster",
"data_types",
"delorean_generated_types",
"delorean_ingest",
"delorean_line_parser",
"generated_types",
"ingest",
"influxdb_line_protocol",
"object_store",
"mem_qe",
"delorean_segment_store",
"packers",
"delorean_test_helpers",
"delorean_tsm",
"delorean_storage",
"test_helpers",
"tsm",
"storage",
"wal",
"delorean_write_buffer",
"write_buffer",
"influxdb2_client",
]
@ -30,17 +30,17 @@ debug = true
[dependencies]
data_types = { path = "data_types" }
delorean_arrow = { path = "delorean_arrow" }
delorean_generated_types = { path = "delorean_generated_types" }
delorean_ingest = { path = "delorean_ingest" }
delorean_line_parser = { path = "delorean_line_parser" }
arrow_deps = { path = "arrow_deps" }
generated_types = { path = "generated_types" }
ingest = { path = "ingest" }
influxdb_line_protocol = { path = "influxdb_line_protocol" }
mem_qe = { path = "mem_qe" }
delorean_segment_store = { path = "delorean_segment_store" }
packers = { path = "packers" }
delorean_write_buffer = { path = "delorean_write_buffer" }
write_buffer = { path = "write_buffer" }
object_store = { path = "object_store" }
delorean_storage = { path = "delorean_storage" }
delorean_tsm = { path = "delorean_tsm" }
storage = { path = "storage" }
tsm = { path = "tsm" }
wal = { path = "wal" }
bytes = "0.5.4"
@ -72,7 +72,7 @@ libflate = "1.0.0"
[dev-dependencies]
assert_cmd = "1.0.0"
criterion = "0.3"
delorean_test_helpers = { path = "delorean_test_helpers" }
test_helpers = { path = "test_helpers" }
hex = "0.4.2"
influxdb2_client = { path = "influxdb2_client" }
libflate = "1.0.0"

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_arrow"
name = "arrow_deps"
version = "0.1.0"
authors = ["alamb <andrew@nerdnetworks.org>"]
edition = "2018"

View File

@ -109,7 +109,7 @@ fn float_encode_sequential(c: &mut Criterion) {
c,
"float_encode_sequential",
&LARGER_BATCH_SIZES,
delorean_tsm::encoders::float::encode,
tsm::encoders::float::encode,
);
}
@ -136,7 +136,7 @@ fn integer_encode_sequential(c: &mut Criterion) {
c,
"integer_encode_sequential",
&LARGER_BATCH_SIZES,
delorean_tsm::encoders::integer::encode,
tsm::encoders::integer::encode,
);
}
@ -145,7 +145,7 @@ fn timestamp_encode_sequential(c: &mut Criterion) {
c,
"timestamp_encode_sequential",
&LARGER_BATCH_SIZES,
delorean_tsm::encoders::timestamp::encode,
tsm::encoders::timestamp::encode,
);
}
@ -177,7 +177,7 @@ fn float_encode_random(c: &mut Criterion) {
.take(batch_size)
.collect()
},
delorean_tsm::encoders::float::encode,
tsm::encoders::float::encode,
)
}
@ -207,7 +207,7 @@ fn integer_encode_random(c: &mut Criterion) {
.map(|_| rand::thread_rng().gen_range(0, 100))
.collect()
},
delorean_tsm::encoders::integer::encode,
tsm::encoders::integer::encode,
)
}
@ -232,7 +232,7 @@ fn float_encode_cpu(c: &mut Criterion) {
"float_encode_cpu",
&SMALLER_BATCH_SIZES,
|batch_size| fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec(),
delorean_tsm::encoders::float::encode,
tsm::encoders::float::encode,
)
}
@ -244,10 +244,10 @@ fn float_decode_cpu(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<f64> = fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec();
let mut encoded = vec![];
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean_tsm::encoders::float::decode,
tsm::encoders::float::decode,
)
}
@ -259,10 +259,10 @@ fn float_decode_sequential(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<f64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean_tsm::encoders::float::decode,
tsm::encoders::float::decode,
)
}
@ -274,10 +274,10 @@ fn integer_decode_sequential(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
delorean_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean_tsm::encoders::integer::decode,
tsm::encoders::integer::decode,
)
}
@ -289,10 +289,10 @@ fn timestamp_decode_sequential(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
delorean_tsm::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
tsm::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean_tsm::encoders::timestamp::decode,
tsm::encoders::timestamp::decode,
)
}
@ -309,10 +309,10 @@ fn float_decode_random(c: &mut Criterion) {
.collect();
let mut encoded = vec![];
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean_tsm::encoders::float::decode,
tsm::encoders::float::decode,
)
}
@ -326,10 +326,10 @@ fn integer_decode_random(c: &mut Criterion) {
.map(|_| rand::thread_rng().gen_range(0, 100))
.collect();
let mut encoded = vec![];
delorean_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean_tsm::encoders::integer::decode,
tsm::encoders::integer::decode,
)
}

View File

@ -12,7 +12,7 @@ fn line_parser(c: &mut Criterion) {
group.bench_function("all lines", |b| {
b.iter(|| {
let lines = delorean_line_parser::parse_lines(LINES)
let lines = influxdb_line_protocol::parse_lines(LINES)
.collect::<Result<Vec<_>, _>>()
.unwrap();

View File

@ -1,11 +1,13 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use data_types::table_schema::Schema;
use delorean_ingest::parquet::{
writer::{CompressionLevel, DeloreanParquetTableWriter},
TryClone,
use influxdb_line_protocol::parse_lines;
use ingest::{
parquet::{
writer::{CompressionLevel, DeloreanParquetTableWriter},
TryClone,
},
ConversionSettings, LineProtocolConverter,
};
use delorean_ingest::{ConversionSettings, LineProtocolConverter};
use delorean_line_parser::parse_lines;
use packers::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use std::time::Duration;

View File

@ -1,8 +1,8 @@
use criterion::{criterion_group, criterion_main, Criterion};
use delorean_tsm::mapper::*;
use delorean_tsm::reader::*;
use delorean_tsm::*;
use std::collections::BTreeMap;
use tsm::mapper::*;
use tsm::reader::*;
use tsm::*;
fn map_field_columns(c: &mut Criterion) {
let mut group = c.benchmark_group("mapper");
@ -18,7 +18,7 @@ fn map_field_columns(c: &mut Criterion) {
max_time: 0,
offset: 0,
size: 0,
typ: delorean_tsm::BlockType::Float,
typ: tsm::BlockType::Float,
reader_idx: 0,
},
)
@ -33,7 +33,7 @@ fn map_field_columns(c: &mut Criterion) {
max_time: 0,
offset: 0,
size: 0,
typ: delorean_tsm::BlockType::Float,
typ: tsm::BlockType::Float,
reader_idx: 0,
},
)
@ -48,7 +48,7 @@ fn map_field_columns(c: &mut Criterion) {
max_time: 0,
offset: 0,
size: 0,
typ: delorean_tsm::BlockType::Integer,
typ: tsm::BlockType::Integer,
reader_idx: 0,
},
)

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_cluster"
name = "cluster"
version = "0.1.0"
authors = ["pauldix <paul@pauldix.net>"]
edition = "2018"
@ -12,12 +12,12 @@ serde = "1.0"
serde_json = "1.0"
async-trait = "0.1"
data_types = { path = "../data_types" }
delorean_generated_types = { path = "../delorean_generated_types" }
delorean_line_parser = { path = "../delorean_line_parser" }
delorean_storage = { path = "../delorean_storage" }
delorean_write_buffer = { path = "../delorean_write_buffer" }
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
storage = { path = "../storage" }
write_buffer = { path = "../write_buffer" }
object_store = { path = "../object_store" }
tokio = { version = "0.2", features = ["full"] }
delorean_arrow = { path = "../delorean_arrow" }
arrow_deps = { path = "../arrow_deps" }
futures = "0.3.7"
bytes = "0.5"

View File

@ -71,15 +71,15 @@ use std::{
},
};
use arrow_deps::arrow::record_batch::RecordBatch;
use data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, HostGroup, HostGroupId, MatchTables},
};
use delorean_arrow::arrow::record_batch::RecordBatch;
use delorean_line_parser::ParsedLine;
use delorean_storage::Database;
use delorean_write_buffer::Db as WriteBufferDb;
use influxdb_line_protocol::ParsedLine;
use object_store::ObjectStore;
use storage::Database;
use write_buffer::Db as WriteBufferDb;
use async_trait::async_trait;
use bytes::Bytes;
@ -402,11 +402,11 @@ fn config_location(id: u32) -> String {
#[cfg(test)]
mod tests {
use super::*;
use arrow_deps::arrow::{csv, util::string_writer::StringWriter};
use async_trait::async_trait;
use data_types::database_rules::{MatchTables, Matcher, Subscription};
use delorean_arrow::arrow::{csv, util::string_writer::StringWriter};
use delorean_line_parser::parse_lines;
use futures::TryStreamExt;
use influxdb_line_protocol::parse_lines;
use object_store::{InMemory, ObjectStoreIntegration};
use snafu::Snafu;
use std::sync::Mutex;

View File

@ -9,8 +9,8 @@ edition = "2018"
[dependencies]
snafu = "0.6"
serde = "1.0"
delorean_generated_types = { path = "../delorean_generated_types" }
delorean_line_parser = { path = "../delorean_line_parser" }
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
chrono = "0.4"
flatbuffers = "0.6"
crc32fast = "1.2.0"

View File

@ -3,8 +3,8 @@
use crate::database_rules::DatabaseRules;
use crate::TIME_COLUMN_NAME;
use delorean_generated_types::wal as wb;
use delorean_line_parser::{FieldValue, ParsedLine};
use generated_types::wal as wb;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use std::{collections::BTreeMap, fmt};

View File

@ -1,4 +1,4 @@
use delorean_line_parser::ParsedLine;
use influxdb_line_protocol::ParsedLine;
use chrono::{DateTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};
@ -175,7 +175,7 @@ pub struct Subscription {
pub struct Matcher {
#[serde(flatten)]
pub tables: MatchTables,
// TODO: make this work with delorean_storage::Predicate
// TODO: make this work with storage::Predicate
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<String>,
}
@ -203,7 +203,7 @@ pub struct HostGroup {
#[cfg(test)]
mod tests {
use super::*;
use delorean_line_parser::parse_lines;
use influxdb_line_protocol::parse_lines;
#[allow(dead_code)]
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@ -8,7 +8,7 @@ edition = "2018"
[dependencies]
delorean_arrow = { path = "../delorean_arrow" }
arrow_deps = { path = "../arrow_deps" }
packers = { path = "../packers" }
croaring = "0.4.5"

View File

@ -3,7 +3,7 @@ use std::mem::size_of;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rand::prelude::*;
use delorean_arrow::arrow::datatypes::*;
use arrow_deps::arrow::datatypes::*;
use delorean_segment_store::column::fixed::Fixed;
use delorean_segment_store::column::fixed_null::FixedNull;

View File

@ -3,7 +3,7 @@ use std::mem::size_of;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rand::prelude::*;
use delorean_arrow::arrow::datatypes::*;
use arrow_deps::arrow::datatypes::*;
use delorean_segment_store::column::fixed::Fixed;
use delorean_segment_store::column::fixed_null::FixedNull;

View File

@ -8,11 +8,11 @@ use std::convert::TryFrom;
use croaring::Bitmap;
use delorean_arrow::arrow::array::{
use arrow_deps::arrow::array::{
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use delorean_arrow::{arrow, arrow::array::Array};
use arrow_deps::{arrow, arrow::array::Array};
/// The possible logical types that column values can have. All values in a
/// column have the same physical type.
@ -2275,7 +2275,7 @@ impl RowIDs {
#[cfg(test)]
mod test {
use super::*;
use delorean_arrow::arrow::array::{
use arrow_deps::arrow::array::{
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};

View File

@ -4,7 +4,7 @@ use std::iter;
use croaring::Bitmap;
use delorean_arrow::arrow::array::{Array, StringArray};
use arrow_deps::arrow::array::{Array, StringArray};
use crate::column::{cmp, RowIDs};

View File

@ -15,9 +15,9 @@
use std::cmp::Ordering;
use std::fmt::Debug;
use delorean_arrow::arrow;
use delorean_arrow::arrow::array::{Array, PrimitiveArray};
use delorean_arrow::arrow::datatypes::ArrowNumericType;
use arrow_deps::arrow;
use arrow_deps::arrow::array::{Array, PrimitiveArray};
use arrow_deps::arrow::datatypes::ArrowNumericType;
use crate::column::{cmp, RowIDs};
@ -479,7 +479,7 @@ where
//
// Here is an example implementation:
//
// impl From<&[i64]> for FixedNull<delorean_arrow::arrow::datatypes::Int64Type> {
// impl From<&[i64]> for FixedNull<arrow_deps::arrow::datatypes::Int64Type> {
// fn from(v: &[i64]) -> Self {
// Self{
// arr: PrimitiveArray::from(v.to_vec()),
@ -487,7 +487,7 @@ where
// }
// }
//
// impl From<&[Option<i64>]> for FixedNull<delorean_arrow::arrow::datatypes::Int64Type> {
// impl From<&[Option<i64>]> for FixedNull<arrow_deps::arrow::datatypes::Int64Type> {
// fn from(v: &[i64]) -> Self {
// Self{
// arr: PrimitiveArray::from(v.to_vec()),
@ -522,33 +522,33 @@ macro_rules! fixed_from_slice_impls {
//
// Need to look at possibility of initialising smaller datatypes...
fixed_from_slice_impls! {
(i64, delorean_arrow::arrow::datatypes::Int64Type),
// (i64, delorean_arrow::arrow::datatypes::Int32Type),
// (i64, delorean_arrow::arrow::datatypes::Int16Type),
// (i64, delorean_arrow::arrow::datatypes::Int8Type),
// (i64, delorean_arrow::arrow::datatypes::UInt32Type),
// (i64, delorean_arrow::arrow::datatypes::UInt16Type),
// (i64, delorean_arrow::arrow::datatypes::UInt8Type),
(i32, delorean_arrow::arrow::datatypes::Int32Type),
// (i32, delorean_arrow::arrow::datatypes::Int16Type),
// (i32, delorean_arrow::arrow::datatypes::Int8Type),
// (i32, delorean_arrow::arrow::datatypes::UInt16Type),
// (i32, delorean_arrow::arrow::datatypes::UInt8Type),
(i16, delorean_arrow::arrow::datatypes::Int16Type),
// (i16, delorean_arrow::arrow::datatypes::Int8Type),
// (i16, delorean_arrow::arrow::datatypes::UInt8Type),
(i8, delorean_arrow::arrow::datatypes::Int8Type),
(u64, delorean_arrow::arrow::datatypes::UInt64Type),
// (u64, delorean_arrow::arrow::datatypes::UInt32Type),
// (u64, delorean_arrow::arrow::datatypes::UInt16Type),
// (u64, delorean_arrow::arrow::datatypes::UInt8Type),
(u32, delorean_arrow::arrow::datatypes::UInt32Type),
// (u32, delorean_arrow::arrow::datatypes::UInt16Type),
// (u32, delorean_arrow::arrow::datatypes::UInt8Type),
(u16, delorean_arrow::arrow::datatypes::UInt16Type),
// (u16, delorean_arrow::arrow::datatypes::UInt8Type),
(u8, delorean_arrow::arrow::datatypes::UInt8Type),
(f64, delorean_arrow::arrow::datatypes::Float64Type),
(i64, arrow_deps::arrow::datatypes::Int64Type),
// (i64, arrow_deps::arrow::datatypes::Int32Type),
// (i64, arrow_deps::arrow::datatypes::Int16Type),
// (i64, arrow_deps::arrow::datatypes::Int8Type),
// (i64, arrow_deps::arrow::datatypes::UInt32Type),
// (i64, arrow_deps::arrow::datatypes::UInt16Type),
// (i64, arrow_deps::arrow::datatypes::UInt8Type),
(i32, arrow_deps::arrow::datatypes::Int32Type),
// (i32, arrow_deps::arrow::datatypes::Int16Type),
// (i32, arrow_deps::arrow::datatypes::Int8Type),
// (i32, arrow_deps::arrow::datatypes::UInt16Type),
// (i32, arrow_deps::arrow::datatypes::UInt8Type),
(i16, arrow_deps::arrow::datatypes::Int16Type),
// (i16, arrow_deps::arrow::datatypes::Int8Type),
// (i16, arrow_deps::arrow::datatypes::UInt8Type),
(i8, arrow_deps::arrow::datatypes::Int8Type),
(u64, arrow_deps::arrow::datatypes::UInt64Type),
// (u64, arrow_deps::arrow::datatypes::UInt32Type),
// (u64, arrow_deps::arrow::datatypes::UInt16Type),
// (u64, arrow_deps::arrow::datatypes::UInt8Type),
(u32, arrow_deps::arrow::datatypes::UInt32Type),
// (u32, arrow_deps::arrow::datatypes::UInt16Type),
// (u32, arrow_deps::arrow::datatypes::UInt8Type),
(u16, arrow_deps::arrow::datatypes::UInt16Type),
// (u16, arrow_deps::arrow::datatypes::UInt8Type),
(u8, arrow_deps::arrow::datatypes::UInt8Type),
(f64, arrow_deps::arrow::datatypes::Float64Type),
}
macro_rules! fixed_from_arrow_impls {
@ -567,7 +567,7 @@ macro_rules! fixed_from_arrow_impls {
//
// Need to look at possibility of initialising smaller datatypes...
fixed_from_arrow_impls! {
(arrow::array::Int64Array, delorean_arrow::arrow::datatypes::Int64Type),
(arrow::array::Int64Array, arrow_deps::arrow::datatypes::Int64Type),
// TODO(edd): add more datatypes
}
@ -575,7 +575,7 @@ fixed_from_arrow_impls! {
mod test {
use super::cmp::Operator;
use super::*;
use delorean_arrow::arrow::datatypes::*;
use arrow_deps::arrow::datatypes::*;
fn some_vec<T: Copy>(v: Vec<T>) -> Vec<Option<T>> {
v.iter().map(|x| Some(*x)).collect()

View File

@ -9,7 +9,7 @@ pub(crate) mod table;
use std::collections::BTreeMap;
use delorean_arrow::arrow::record_batch::RecordBatch;
use arrow_deps::arrow::record_batch::RecordBatch;
use column::AggregateType;
use partition::Partition;

View File

@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::slice::Iter;
use delorean_arrow::arrow::record_batch::RecordBatch;
use arrow_deps::arrow::record_batch::RecordBatch;
use crate::column::{AggregateResult, AggregateType, Scalar, Value, Values};
use crate::segment::{ColumnName, GroupKey, Segment};

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_generated_types"
name = "generated_types"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"

View File

@ -1,5 +1,7 @@
//! Compiles Protocol Buffers and FlatBuffers schema definitions into
//! native Rust types.
//!
//! Source files are found in
use std::{
path::{Path, PathBuf},
@ -10,8 +12,7 @@ type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;
fn main() -> Result<()> {
let mut root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
root.push("..");
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
generate_grpc_types(&root)?;
generate_wal_types(&root)?;
@ -23,7 +24,7 @@ fn main() -> Result<()> {
///
/// Creates `influxdata.platform.storage.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let proto_file = root.join("proto/delorean/delorean.proto");
let proto_file = root.join("delorean.proto");
println!("cargo:rerun-if-changed={}", proto_file.display());
tonic_build::compile_protos(proto_file)?;
@ -35,7 +36,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
///
/// Creates `wal_generated.rs`
fn generate_wal_types(root: &Path) -> Result<()> {
let wal_file = root.join("proto/delorean/wal.fbs");
let wal_file = root.join("wal.fbs");
println!("cargo:rerun-if-changed={}", wal_file.display());
let out_dir: PathBuf = std::env::var_os("OUT_DIR")

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_line_parser"
name = "influxdb_line_protocol"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
@ -12,4 +12,4 @@ snafu = "0.6.2"
influxdb2_client = { path = "../influxdb2_client" }
[dev-dependencies]
delorean_test_helpers = { path = "../delorean_test_helpers" }
test_helpers = { path = "../test_helpers" }

View File

@ -109,10 +109,10 @@ impl nom::error::ParseError<&str> for Error {
/// into a `ParsedLine`:
///
/// ```
/// use delorean_line_parser::{ParsedLine, FieldValue};
/// use influxdb_line_protocol::{ParsedLine, FieldValue};
///
/// let mut parsed_lines =
/// delorean_line_parser::parse_lines(
/// influxdb_line_protocol::parse_lines(
/// "cpu,host=A,region=west usage_system=64i 1590488773254420000"
/// );
/// let parsed_line = parsed_lines
@ -152,10 +152,10 @@ impl<'a> ParsedLine<'a> {
/// always present).
///
/// ```
/// use delorean_line_parser::{ParsedLine, FieldValue};
/// use influxdb_line_protocol::{ParsedLine, FieldValue};
///
/// let mut parsed_lines =
/// delorean_line_parser::parse_lines(
/// influxdb_line_protocol::parse_lines(
/// "cpu,host=A,region=west usage_system=64i 1590488773254420000"
/// );
/// let parsed_line = parsed_lines
@ -1020,8 +1020,8 @@ fn escape_and_write_value(
#[cfg(test)]
mod test {
use super::*;
use delorean_test_helpers::approximately_equal;
use smallvec::smallvec;
use test_helpers::approximately_equal;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_ingest"
name = "ingest"
version = "0.1.0"
authors = ["Andrew Lamb <alamb@influxdata.com>"]
edition = "2018"
@ -11,12 +11,12 @@ snafu = "0.6.2"
env_logger = "0.7.1"
tracing = "0.1"
delorean_line_parser = { path = "../delorean_line_parser" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
packers = { path = "../packers" }
data_types = { path = "../data_types" }
delorean_tsm = { path = "../delorean_tsm" }
delorean_arrow = { path = "../delorean_arrow" }
tsm = { path = "../tsm" }
arrow_deps = { path = "../arrow_deps" }
[dev-dependencies]
delorean_test_helpers ={ path = "../delorean_test_helpers" }
test_helpers ={ path = "../test_helpers" }
libflate = "1.0.0"

View File

@ -10,12 +10,7 @@
)]
use data_types::table_schema::{DataType, Schema, SchemaBuilder};
use delorean_line_parser::{FieldValue, ParsedLine};
use delorean_tsm::{
mapper::{ColumnData, MeasurementTable, TSMMeasurementMapper},
reader::{BlockDecoder, TSMBlockReader, TSMIndexReader},
BlockType, TSMError,
};
use influxdb_line_protocol::{FieldValue, ParsedLine};
use packers::{
ByteArray, DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, Packer, Packers,
};
@ -25,6 +20,11 @@ use std::{
io::{Read, Seek},
};
use tracing::debug;
use tsm::{
mapper::{ColumnData, MeasurementTable, TSMMeasurementMapper},
reader::{BlockDecoder, TSMBlockReader, TSMIndexReader},
BlockType, TSMError,
};
pub mod parquet;
@ -856,7 +856,7 @@ impl TSMFileConverter {
//
m.process(
&mut block_reader,
|section: delorean_tsm::mapper::TableSection| -> Result<(), TSMError> {
|section: tsm::mapper::TableSection| -> Result<(), TSMError> {
// number of rows in each column in this table section.
let col_len = section.len();
@ -1086,15 +1086,15 @@ impl std::fmt::Debug for TSMFileConverter {
}
#[cfg(test)]
mod delorean_ingest_tests {
mod tests {
use super::*;
use data_types::table_schema::ColumnDefinition;
use delorean_test_helpers::approximately_equal;
use delorean_tsm::{
use packers::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, Packers};
use test_helpers::approximately_equal;
use tsm::{
reader::{BlockData, MockBlockDecoder},
Block,
};
use packers::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, Packers};
use libflate::gzip;
use std::fs::File;
@ -1215,7 +1215,7 @@ mod delorean_ingest_tests {
}
fn only_good_lines(data: &str) -> Vec<ParsedLine<'_>> {
delorean_line_parser::parse_lines(data)
influxdb_line_protocol::parse_lines(data)
.filter_map(|r| {
assert!(r.is_ok());
r.ok()

View File

@ -8,7 +8,7 @@
)]
// Export the parts of the parquet crate that are needed to interact with code in this crate
pub use delorean_arrow::parquet::{
pub use arrow_deps::parquet::{
errors::ParquetError,
file::reader::{ChunkReader, Length},
file::writer::TryClone,

View File

@ -1,6 +1,6 @@
use snafu::Snafu;
use delorean_arrow::parquet::errors::ParquetError;
use arrow_deps::parquet::errors::ParquetError;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -1,10 +1,10 @@
//! Provide storage statistics for parquet files
use data_types::table_schema::DataType;
use delorean_arrow::parquet::{
use arrow_deps::parquet::{
self,
file::reader::{FileReader, SerializedFileReader},
schema,
};
use data_types::table_schema::DataType;
use snafu::ResultExt;
use super::{

View File

@ -1,5 +1,5 @@
//! Provide storage statistics for parquet files
use delorean_arrow::parquet::{
use arrow_deps::parquet::{
basic::{Compression, Encoding},
file::reader::{FileReader, SerializedFileReader},
};

View File

@ -1,5 +1,5 @@
//! This module contains the code to write delorean table data to parquet
use delorean_arrow::parquet::{
use arrow_deps::parquet::{
self,
basic::{Compression, Encoding, LogicalType, Repetition, Type as PhysicalType},
errors::ParquetError,
@ -97,8 +97,8 @@ where
/// # use data_types::table_schema::DataType;
/// # use packers::DeloreanTableWriter;
/// # use packers::{Packer, Packers};
/// # use delorean_ingest::parquet::writer::{DeloreanParquetTableWriter, CompressionLevel};
/// # use delorean_arrow::parquet::data_type::ByteArray;
/// # use ingest::parquet::writer::{DeloreanParquetTableWriter, CompressionLevel};
/// # use arrow_deps::parquet::data_type::ByteArray;
///
/// let schema = table_schema::SchemaBuilder::new("measurement_name")
/// .tag("tag1")

View File

@ -1,7 +1,7 @@
use delorean_ingest::parquet::writer::{CompressionLevel, DeloreanParquetTableWriter};
use ingest::parquet::writer::{CompressionLevel, DeloreanParquetTableWriter};
use packers::{DeloreanTableWriter, Packer, Packers};
use delorean_arrow::parquet::data_type::ByteArray;
use arrow_deps::parquet::data_type::ByteArray;
use std::fs;
#[test]
@ -59,7 +59,7 @@ fn test_write_parquet_data() {
packers[5].i64_packer_mut().push(910000000000);
// write the data out to the parquet file
let output_path = delorean_test_helpers::tempfile::Builder::new()
let output_path = test_helpers::tempfile::Builder::new()
.prefix("delorean_parquet_e2e")
.suffix(".parquet")
.tempfile()

View File

@ -8,7 +8,7 @@ edition = "2018"
[dependencies]
delorean_arrow = { path = "../delorean_arrow" }
arrow_deps = { path = "../arrow_deps" }
packers = { path = "../packers" }
snafu = "0.6.8"
croaring = "0.4.5"

View File

@ -1,13 +1,13 @@
//! Code for interfacing and running queries in DataFusion
// use crate::Store;
// use delorean_arrow::arrow::{
// use arrow_deps::arrow::{
// datatypes::{Schema, SchemaRef},
// record_batch::{RecordBatch, RecordBatchReader},
// util::pretty,
// };
// use delorean_arrow::datafusion::prelude::*;
// use delorean_arrow::datafusion::{
// use arrow_deps::datafusion::prelude::*;
// use arrow_deps::datafusion::{
// datasource::TableProvider,
// execution::{
// context::ExecutionContextState,
@ -48,7 +48,7 @@
// &self,
// _projection: &Option<Vec<usize>>,
// _batch_size: usize,
// ) -> delorean_arrow::datafusion::error::Result<Vec<Arc<dyn Partition>>> {
// ) -> arrow_deps::datafusion::error::Result<Vec<Arc<dyn Partition>>> {
// unimplemented!("scan not yet implemented");
// }
// }
@ -220,7 +220,7 @@
// &self,
// input_physical_plans: Vec<Arc<dyn ExecutionPlan>>,
// _ctx_state: Arc<Mutex<ExecutionContextState>>,
// ) -> delorean_arrow::datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// ) -> arrow_deps::datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// assert_eq!(input_physical_plans.len(), 0, "Can not have inputs");
// // If this were real code, we would now progrmatically
@ -277,7 +277,7 @@
// self.store.schema()
// }
// fn partitions(&self) -> delorean_arrow::datafusion::error::Result<Vec<Arc<dyn Partitioning>>> {
// fn partitions(&self) -> arrow_deps::datafusion::error::Result<Vec<Arc<dyn Partitioning>>> {
// let store = self.store.clone();
// Ok(vec![Arc::new(SegmentPartition {
// store,
@ -297,7 +297,7 @@
// impl Partition for SegmentPartition {
// fn execute(
// &self,
// ) -> delorean_arrow::datafusion::error::Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>
// ) -> arrow_deps::datafusion::error::Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>
// {
// let combined_results: Vec<Arc<RecordBatch>> = vec![];

View File

@ -12,9 +12,9 @@ use datatypes::TimeUnit;
use snafu::Snafu;
use tracing::debug;
use delorean_arrow::arrow::record_batch::{RecordBatch, RecordBatchReader};
use delorean_arrow::arrow::{array, array::Array, datatypes, ipc};
use delorean_arrow::parquet::arrow::arrow_reader::ArrowReader;
use arrow_deps::arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow_deps::arrow::{array, array::Array, datatypes, ipc};
use arrow_deps::parquet::arrow::arrow_reader::ArrowReader;
use mem_qe::column;
use mem_qe::column::{AggregateType, Column};
use mem_qe::segment::{ColumnType, GroupingStrategy, Schema, Segment};
@ -79,9 +79,8 @@ fn build_parquet_store(path: &str, store: &mut Store, sort_order: Vec<&str>) ->
path
);
let parquet_reader =
delorean_arrow::parquet::file::reader::SerializedFileReader::new(r).unwrap();
let mut reader = delorean_arrow::parquet::arrow::arrow_reader::ParquetFileArrowReader::new(
let parquet_reader = arrow_deps::parquet::file::reader::SerializedFileReader::new(r).unwrap();
let mut reader = arrow_deps::parquet::arrow::arrow_reader::ParquetFileArrowReader::new(
Rc::new(parquet_reader),
);
let batch_size = 60000;

View File

@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::convert::From;
use delorean_arrow::arrow;
use arrow_deps::arrow;
use super::encoding;
@ -1708,7 +1708,7 @@ where
}
}
use delorean_arrow::arrow::array::Array;
use arrow_deps::arrow::array::Array;
impl From<arrow::array::Float64Array> for NumericColumn<f64> {
fn from(arr: arrow::array::Float64Array) -> Self {

View File

@ -2,8 +2,8 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::iter;
use std::mem::size_of;
use delorean_arrow::arrow::array::{Array, PrimitiveArray};
use delorean_arrow::arrow::datatypes::ArrowNumericType;
use arrow_deps::arrow::array::{Array, PrimitiveArray};
use arrow_deps::arrow::datatypes::ArrowNumericType;
pub trait NumericEncoding: Send + Sync + std::fmt::Display + std::fmt::Debug {
type Item;

View File

@ -6,7 +6,7 @@ pub mod encoding;
pub mod segment;
pub mod sorter;
use delorean_arrow::arrow::datatypes::SchemaRef;
use arrow_deps::arrow::datatypes::SchemaRef;
use segment::{Segment, Segments};
#[derive(Debug, Default)]

View File

@ -4,7 +4,7 @@ use tracing::{debug, error, info};
use super::column;
use super::column::{AggregateType, Column};
use delorean_arrow::arrow::datatypes::SchemaRef;
use arrow_deps::arrow::datatypes::SchemaRef;
// Only used in a couple of specific places for experimentation.
const THREADS: usize = 16;
@ -1750,12 +1750,12 @@ pub struct GroupedAggregates {
#[cfg(test)]
mod test {
use delorean_arrow::arrow::datatypes::*;
use arrow_deps::arrow::datatypes::*;
#[test]
fn segment_group_key_sorted() {
let schema = super::Schema::with_sort_order(
delorean_arrow::arrow::datatypes::SchemaRef::new(Schema::new(vec![
arrow_deps::arrow::datatypes::SchemaRef::new(Schema::new(vec![
Field::new("env", DataType::Utf8, false),
Field::new("role", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),

View File

@ -11,9 +11,9 @@ tracing = "0.1"
rand = "0.7.3"
human_format = "1.0.3"
delorean_arrow = { path = "../delorean_arrow" }
arrow_deps = { path = "../arrow_deps" }
data_types = { path = "../data_types" }
delorean_tsm = { path = "../delorean_tsm" }
tsm = { path = "../tsm" }
[dev-dependencies]
delorean_test_helpers ={ path = "../delorean_test_helpers" }
test_helpers ={ path = "../test_helpers" }

View File

@ -13,8 +13,8 @@ pub mod stats;
use snafu::Snafu;
pub use crate::packers::{Packer, Packers};
pub use arrow_deps::parquet::data_type::ByteArray;
use data_types::table_schema::Schema;
pub use delorean_arrow::parquet::data_type::ByteArray;
use std::borrow::Cow;

View File

@ -9,7 +9,7 @@ use core::iter::Iterator;
use std::iter;
use std::slice::Chunks;
use delorean_arrow::parquet::data_type::ByteArray;
use arrow_deps::parquet::data_type::ByteArray;
use std::default::Default;
// NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
@ -189,14 +189,14 @@ impl std::convert::From<data_types::table_schema::DataType> for Packers {
}
}
impl std::convert::From<delorean_tsm::BlockType> for Packers {
fn from(t: delorean_tsm::BlockType) -> Self {
impl std::convert::From<tsm::BlockType> for Packers {
fn from(t: tsm::BlockType) -> Self {
match t {
delorean_tsm::BlockType::Float => Self::Float(Packer::<f64>::new()),
delorean_tsm::BlockType::Integer => Self::Integer(Packer::<i64>::new()),
delorean_tsm::BlockType::Str => Self::String(Packer::<ByteArray>::new()),
delorean_tsm::BlockType::Bool => Self::Boolean(Packer::<bool>::new()),
delorean_tsm::BlockType::Unsigned => Self::Integer(Packer::<i64>::new()),
tsm::BlockType::Float => Self::Float(Packer::<f64>::new()),
tsm::BlockType::Integer => Self::Integer(Packer::<i64>::new()),
tsm::BlockType::Str => Self::String(Packer::<ByteArray>::new()),
tsm::BlockType::Bool => Self::Boolean(Packer::<bool>::new()),
tsm::BlockType::Unsigned => Self::Integer(Packer::<i64>::new()),
}
}
}

View File

@ -1,9 +1,9 @@
use data_types::table_schema::Schema;
use delorean_ingest::{
use influxdb_line_protocol::parse_lines;
use ingest::{
parquet::writer::{CompressionLevel, DeloreanParquetTableWriter, Error as ParquetWriterError},
ConversionSettings, Error as IngestError, LineProtocolConverter, TSMFileConverter,
};
use delorean_line_parser::parse_lines;
use packers::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use snafu::{ResultExt, Snafu};
use std::{

View File

@ -7,7 +7,7 @@ use delorean_ingest::{
writer::Error as ParquetWriterError
}
};
use delorean_tsm::TSMError;
use tsm::TSMError;
#[derive(Debug, Snafu)]
pub enum Error {
@ -46,7 +46,7 @@ pub enum Error {
#[snafu(context(false))]
#[snafu(display("Error converting data {}", source))]
Conversion { source: delorean_ingest::Error },
Conversion { source: ingest::Error },
#[snafu(display("Error creating a parquet table writer {}", source))]
UnableToCreateParquetTableWriter { source: ParquetWriterError },
@ -67,7 +67,7 @@ pub enum Error {
TSM { source: TSMError },
#[snafu(display(r#"Error parsing data: {}"#, source))]
Parsing { source: delorean_line_parser::Error },
Parsing { source: influxdb_line_protocol::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -1,11 +1,11 @@
use delorean_ingest::parquet::metadata::print_parquet_metadata;
use delorean_tsm::{reader::IndexEntry, reader::TSMIndexReader, InfluxID, TSMError};
use ingest::parquet::metadata::print_parquet_metadata;
use snafu::{ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryInto,
};
use tracing::{debug, info};
use tsm::{reader::IndexEntry, reader::TSMIndexReader, InfluxID, TSMError};
use crate::commands::input::{FileType, InputReader};
@ -151,7 +151,7 @@ pub enum Error {
#[snafu(display("Unable to dump parquet file metadata: {}", source))]
UnableDumpToParquetMetadata {
source: delorean_ingest::parquet::error::Error,
source: ingest::parquet::error::Error,
},
#[snafu(display(r#"Unable to create TSM reader: {}"#, source))]

View File

@ -1,5 +1,5 @@
use delorean_arrow::parquet::file::serialized_reader::{FileSource, SliceableCursor};
use delorean_ingest::parquet::ChunkReader;
use arrow_deps::parquet::file::serialized_reader::{FileSource, SliceableCursor};
use ingest::parquet::ChunkReader;
/// Module to handle input files (and maybe urls?)
use libflate::gzip;
use packers::Name;
@ -144,7 +144,7 @@ impl Read for InputReader {
}
}
impl delorean_ingest::parquet::Length for InputReader {
impl ingest::parquet::Length for InputReader {
fn len(&self) -> u64 {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.file_size,
@ -155,11 +155,7 @@ impl delorean_ingest::parquet::Length for InputReader {
impl ChunkReader for InputReader {
type T = InputSlice;
fn get_read(
&self,
start: u64,
length: usize,
) -> delorean_arrow::parquet::errors::Result<Self::T> {
fn get_read(&self, start: u64, length: usize) -> arrow_deps::parquet::errors::Result<Self::T> {
match self {
Self::FileInputType(file_input_reader) => Ok(InputSlice::FileSlice(FileSource::new(
file_input_reader.reader.get_ref(),
@ -173,7 +169,7 @@ impl ChunkReader for InputReader {
}
}
impl delorean_ingest::parquet::TryClone for InputReader {
impl ingest::parquet::TryClone for InputReader {
fn try_clone(&self) -> std::result::Result<Self, std::io::Error> {
Err(io::Error::new(
io::ErrorKind::Other,

View File

@ -1,6 +1,6 @@
//! This module contains code to report compression statistics for storage files
use delorean_ingest::parquet::{error::Error as DeloreanParquetError, stats as parquet_stats};
use ingest::parquet::{error::Error as DeloreanParquetError, stats as parquet_stats};
use packers::{
stats::{FileSetStatsBuilder, FileStats},
Name,

View File

@ -9,10 +9,10 @@ use std::sync::Arc;
use crate::server::http_routes;
use crate::server::rpc::storage;
use delorean_storage::exec::Executor as StorageExecutor;
use delorean_write_buffer::{Db, WriteBufferDatabases};
use ::storage::exec::Executor as StorageExecutor;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use write_buffer::{Db, WriteBufferDatabases};
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
dotenv::dotenv().ok();

View File

@ -8,7 +8,7 @@
)]
use clap::{crate_authors, crate_version, value_t, App, Arg, ArgMatches, SubCommand};
use delorean_ingest::parquet::writer::CompressionLevel;
use ingest::parquet::writer::CompressionLevel;
use tokio::runtime::Runtime;
use tracing::{debug, error, info, warn};

View File

@ -17,9 +17,9 @@
use http::header::CONTENT_ENCODING;
use tracing::{debug, error, info};
use delorean_arrow::arrow;
use delorean_line_parser::parse_lines;
use delorean_storage::{org_and_bucket_to_database, Database, DatabaseStore};
use arrow_deps::arrow;
use influxdb_line_protocol::parse_lines;
use storage::{org_and_bucket_to_database, Database, DatabaseStore};
use bytes::{Bytes, BytesMut};
use futures::{self, StreamExt};
@ -109,7 +109,9 @@ pub enum ApplicationError {
ReadingBodyAsUtf8 { source: std::str::Utf8Error },
#[snafu(display("Error parsing line protocol: {}", source))]
ParsingLineProtocol { source: delorean_line_parser::Error },
ParsingLineProtocol {
source: influxdb_line_protocol::Error,
},
#[snafu(display("Error decompressing body as gzip: {}", source))]
ReadingBodyAsGzip { source: std::io::Error },
@ -233,7 +235,7 @@ async fn write<T: DatabaseStore>(
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
let lines = parse_lines(body)
.collect::<Result<Vec<_>, delorean_line_parser::Error>>()
.collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
.context(ParsingLineProtocol)?;
debug!(
@ -356,8 +358,7 @@ mod tests {
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use delorean_storage::test::TestDatabaseStore;
use delorean_storage::DatabaseStore;
use storage::{test::TestDatabaseStore, DatabaseStore};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -2,17 +2,17 @@
use std::{collections::BTreeSet, sync::Arc};
use delorean_arrow::arrow::{
use arrow_deps::arrow::{
array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray},
datatypes::DataType as ArrowDataType,
};
use delorean_storage::exec::{
use storage::exec::{
fieldlist::FieldList,
seriesset::{GroupDescription, GroupedSeriesSetItem, SeriesSet},
};
use delorean_generated_types::{
use generated_types::{
measurement_fields_response::{FieldType, MessageField},
read_response::{
frame::Data, BooleanPointsFrame, DataType, FloatPointsFrame, Frame, GroupFrame,
@ -316,11 +316,11 @@ fn datatype_to_measurement_field_enum(data_type: &ArrowDataType) -> Result<Field
#[cfg(test)]
mod tests {
use delorean_arrow::arrow::{
use arrow_deps::arrow::{
datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema},
record_batch::RecordBatch,
};
use delorean_storage::exec::fieldlist::Field;
use storage::exec::fieldlist::Field;
use super::*;

View File

@ -1,18 +1,18 @@
//! This module has logic to translate gRPC `Predicate` nodes into
//! delorean_storage_interface::Predicates
//! This module has logic to translate gRPC `Predicate` nodes into the
//! native storage system predicate form, `storage::Predicates`
use std::convert::TryFrom;
use delorean_arrow::datafusion::{
use arrow_deps::datafusion::{
logical_plan::{Expr, Operator},
scalar::ScalarValue,
};
use delorean_generated_types::{
use generated_types::{
node::Comparison as RPCComparison, node::Logical as RPCLogical, node::Value as RPCValue,
Node as RPCNode, Predicate as RPCPredicate,
};
use delorean_storage::predicate::PredicateBuilder;
use snafu::{ResultExt, Snafu};
use storage::predicate::PredicateBuilder;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -1,11 +1,11 @@
use tonic::Status;
use delorean_generated_types::{
use generated_types::{
MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
MeasurementTagValuesRequest, ReadFilterRequest, ReadGroupRequest, ReadSource, TagKeysRequest,
TagValuesRequest,
};
use delorean_storage::id::Id;
use storage::id::Id;
use std::convert::TryInto;

View File

@ -1,10 +1,10 @@
//! This module contains implementations for the storage gRPC service
//! implemented in terms of the `delorean_storage::Database` and
//! `delorean_storage::DatabaseStore`
//! implemented in terms of the `storage::Database` and
//! `storage::DatabaseStore`
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use delorean_generated_types::{
use generated_types::{
delorean_server::{Delorean, DeloreanServer},
storage_server::{Storage, StorageServer},
CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest,
@ -17,12 +17,12 @@ use delorean_generated_types::{
// For some reason rust thinks these imports are unused, but then
// complains of unresolved imports if they are not imported.
#[allow(unused_imports)]
use delorean_generated_types::{node, Node};
use generated_types::{node, Node};
use crate::server::rpc::expr::{AddRPCNode, SpecialTagKeys};
use crate::server::rpc::input::GrpcInputs;
use delorean_storage::{
use storage::{
exec::{
seriesset::{Error as SeriesSetError, GroupedSeriesSetItem, SeriesSet},
Executor as StorageExecutor,
@ -992,8 +992,13 @@ where
mod tests {
use super::*;
use crate::panic::SendPanicsToTracing;
use delorean_arrow::arrow::datatypes::DataType;
use delorean_storage::{
use arrow_deps::arrow::datatypes::DataType;
use std::{
convert::TryFrom,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use storage::{
exec::fieldlist::{Field, FieldList},
exec::FieldListPlan,
exec::GroupedSeriesSetPlans,
@ -1005,19 +1010,12 @@ mod tests {
test::TestDatabaseStore,
test::{ColumnValuesRequest, QuerySeriesRequest},
};
use delorean_test_helpers::tracing::TracingCapture;
use std::{
convert::TryFrom,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use test_helpers::tracing::TracingCapture;
use tonic::Code;
use futures::prelude::*;
use delorean_generated_types::{
delorean_client, read_response::frame, storage_client, ReadSource,
};
use generated_types::{delorean_client, read_response::frame, storage_client, ReadSource};
use prost::Message;
type DeloreanClient = delorean_client::DeloreanClient<tonic::transport::Channel>;
@ -1656,7 +1654,7 @@ mod tests {
partition_id,
));
let group = delorean_generated_types::read_group_request::Group::None as i32;
let group = generated_types::read_group_request::Group::None as i32;
let request = ReadGroupRequest {
read_source: source.clone(),

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_storage"
name = "storage"
version = "0.1.0"
authors = ["alamb <andrew@nerdnetworks.org>"]
edition = "2018"
@ -15,7 +15,7 @@ serde_urlencoded = "0.6.1"
tracing = "0.1"
croaring = "0.4.5"
delorean_arrow = { path = "../delorean_arrow" }
delorean_line_parser = { path = "../delorean_line_parser" }
arrow_deps = { path = "../arrow_deps" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
data_types = { path = "../data_types" }
delorean_test_helpers = { path = "../delorean_test_helpers" }
test_helpers = { path = "../test_helpers" }

View File

@ -10,11 +10,11 @@ pub mod stringset;
use std::sync::Arc;
use counters::ExecutionCounters;
use delorean_arrow::{
use arrow_deps::{
arrow::record_batch::RecordBatch,
datafusion::{self, logical_plan::LogicalPlan},
};
use counters::ExecutionCounters;
use planning::DeloreanExecutionContext;
use schema_pivot::SchemaPivotNode;
@ -509,7 +509,7 @@ async fn run_logical_plans(
#[cfg(test)]
mod tests {
use delorean_arrow::arrow::{
use arrow_deps::arrow::{
array::Int64Array,
array::StringArray,
array::StringBuilder,

View File

@ -3,13 +3,13 @@
//! pull them from RecordBatches
use std::collections::BTreeMap;
use data_types::TIME_COLUMN_NAME;
use delorean_arrow::arrow::{
use arrow_deps::arrow::{
self,
array::Int64Array,
datatypes::{DataType, SchemaRef},
record_batch::RecordBatch,
};
use data_types::TIME_COLUMN_NAME;
use snafu::{ensure, ResultExt, Snafu};
@ -187,7 +187,7 @@ mod tests {
use std::sync::Arc;
use arrow::array::ArrayRef;
use delorean_arrow::arrow::{
use arrow_deps::arrow::{
array::Int64Array,
array::StringArray,
datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema},

View File

@ -2,7 +2,7 @@
use std::sync::Arc;
use delorean_arrow::{
use arrow_deps::{
arrow::record_batch::RecordBatch,
datafusion::physical_plan::merge::MergeExec,
datafusion::physical_plan::SendableRecordBatchStream,
@ -25,7 +25,7 @@ use crate::util::dump_plan;
use tracing::debug;
// Reuse DataFusion error and Result types for this module
pub use delorean_arrow::datafusion::error::{DataFusionError as Error, Result};
pub use arrow_deps::datafusion::error::{DataFusionError as Error, Result};
use super::counters::ExecutionCounters;

View File

@ -27,7 +27,7 @@ use std::{
use async_trait::async_trait;
use delorean_arrow::{
use arrow_deps::{
arrow::array::StringBuilder,
arrow::datatypes::{DataType, Field, Schema, SchemaRef},
arrow::record_batch::RecordBatch,
@ -42,7 +42,7 @@ use delorean_arrow::{
use tokio::stream::StreamExt;
pub use delorean_arrow::datafusion::error::{DataFusionError as Error, Result};
pub use arrow_deps::datafusion::error::{DataFusionError as Error, Result};
/// Implementes the SchemaPivot operation described in make_schema_pivot,
pub struct SchemaPivotNode {
@ -281,7 +281,7 @@ mod tests {
use crate::exec::stringset::{IntoStringSet, StringSetRef};
use super::*;
use delorean_arrow::{
use arrow_deps::{
arrow::array::StringArray,
arrow::{
array::Int64Array,

View File

@ -22,11 +22,11 @@ use std::sync::Arc;
use arrow::{
array::StringArray, datatypes::DataType, datatypes::SchemaRef, record_batch::RecordBatch,
};
use data_types::TIME_COLUMN_NAME;
use delorean_arrow::{
use arrow_deps::{
arrow::{self},
datafusion::physical_plan::SendableRecordBatchStream,
};
use data_types::TIME_COLUMN_NAME;
use snafu::{ResultExt, Snafu};
use tokio::stream::StreamExt;
use tokio::sync::mpsc::{self, error::SendError};
@ -486,8 +486,8 @@ mod tests {
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use delorean_arrow::datafusion::physical_plan::common::SizedRecordBatchStream;
use delorean_test_helpers::{str_pair_vec_to_vec, str_vec_to_arc_vec};
use arrow_deps::datafusion::physical_plan::common::SizedRecordBatchStream;
use test_helpers::{str_pair_vec_to_vec, str_vec_to_arc_vec};
use super::*;

View File

@ -4,7 +4,7 @@
use std::{collections::BTreeSet, sync::Arc};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use delorean_arrow::{
use arrow_deps::{
arrow,
arrow::array::{Array, StringArray},
arrow::datatypes::DataType,

View File

@ -6,11 +6,11 @@
clippy::use_self
)]
use arrow_deps::arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::data::ReplicatedWrite;
use delorean_arrow::arrow::record_batch::RecordBatch;
use delorean_line_parser::ParsedLine;
use exec::{FieldListPlan, GroupedSeriesSetPlans, SeriesSetPlans, StringSetPlan};
use influxdb_line_protocol::ParsedLine;
use std::{fmt::Debug, sync::Arc};
@ -137,8 +137,8 @@ pub fn org_and_bucket_to_database(org: impl Into<String>, bucket: &str) -> Strin
// error[E0433]: failed to resolve: could not find `test` in `delorean`
// --> src/server/write_buffer_routes.rs:353:19
// |
// 353 | use delorean_storage::test::TestDatabaseStore;
// | ^^^^ could not find `test` in `delorean_storage`
// 353 | use storage::test::TestDatabaseStore;
// | ^^^^ could not find `test` in `delorean_storage`
//
//#[cfg(test)]

View File

@ -1,6 +1,6 @@
use std::collections::BTreeSet;
use delorean_arrow::datafusion::logical_plan::Expr;
use arrow_deps::datafusion::logical_plan::Expr;
/// Specifies a continuous range of nanosecond timestamps. Timestamp
/// predicates are so common and critical to performance of timeseries

View File

@ -1,7 +1,7 @@
//! This module provides a reference implementaton of `storage::DatabaseSource` and
//! `storage::Database` for use in testing.
use delorean_arrow::arrow::record_batch::RecordBatch;
use arrow_deps::arrow::record_batch::RecordBatch;
use crate::{
exec::FieldListPlan,
@ -13,7 +13,7 @@ use crate::{
};
use data_types::data::ReplicatedWrite;
use delorean_line_parser::{parse_lines, ParsedLine};
use influxdb_line_protocol::{parse_lines, ParsedLine};
use async_trait::async_trait;
use snafu::{OptionExt, Snafu};

View File

@ -1,5 +1,5 @@
//! This module contains DataFusion utility functions and helpers
use delorean_arrow::datafusion::{
use arrow_deps::datafusion::{
logical_plan::Expr, logical_plan::LogicalPlan, logical_plan::Operator, optimizer::utils::inputs,
};
use std::io::Write;

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_test_helpers"
name = "test_helpers"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"

View File

@ -68,7 +68,7 @@ fn convert_bad_compression_level() {
fn convert_line_protocol_good_input_filename() {
let mut cmd = Command::cargo_bin("delorean").unwrap();
let parquet_path = delorean_test_helpers::tempfile::Builder::new()
let parquet_path = test_helpers::tempfile::Builder::new()
.prefix("convert_e2e")
.suffix(".parquet")
.tempfile()
@ -109,7 +109,7 @@ fn convert_tsm_good_input_filename() {
// let mut cmd = Command::cargo_bin("delorean").unwrap();
// let tmp_dir = delorean_test_helpers::tmp_dir();
// let tmp_dir = test_helpers::tmp_dir();
// let parquet_path = tmp_dir.unwrap().into_path().to_str().unwrap();
// // ::Builder::new()
@ -158,7 +158,7 @@ fn convert_multiple_measurements() {
let mut cmd = Command::cargo_bin("delorean").unwrap();
// Create a directory
let parquet_output_path = delorean_test_helpers::tempfile::Builder::new()
let parquet_output_path = test_helpers::tempfile::Builder::new()
.prefix("convert_multiple_e2e")
.tempdir()
.expect("error creating temp directory");
@ -250,13 +250,10 @@ fn meta_bad_input_filename_gz() {
}
// gunzip's the contents of the file at input_path into a temporary path
fn uncompress_gz(
input_path: &str,
output_extension: &str,
) -> delorean_test_helpers::tempfile::TempPath {
fn uncompress_gz(input_path: &str, output_extension: &str) -> test_helpers::tempfile::TempPath {
let gz_file = File::open(input_path).expect("Error opening input");
let output_path = delorean_test_helpers::tempfile::Builder::new()
let output_path = test_helpers::tempfile::Builder::new()
.prefix("decompressed_e2e")
.suffix(output_extension)
.tempfile()

View File

@ -16,7 +16,8 @@
// - Stopping the server after all relevant tests are run
use assert_cmd::prelude::*;
use delorean_generated_types::{
use futures::prelude::*;
use generated_types::{
node::{Comparison, Value},
read_group_request::Group,
read_response::{frame::Data, *},
@ -25,8 +26,6 @@ use delorean_generated_types::{
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest, ReadSource,
Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
};
use delorean_test_helpers::*;
use futures::prelude::*;
use prost::Message;
use std::convert::TryInto;
use std::fs;
@ -35,6 +34,7 @@ use std::str;
use std::time::{Duration, SystemTime};
use std::u32;
use tempfile::TempDir;
use test_helpers::*;
const HTTP_BASE: &str = "http://localhost:8080";
const API_BASE: &str = "http://localhost:8080/api/v2";
@ -505,7 +505,7 @@ impl TestServer {
fn new() -> Result<Self> {
let _ = dotenv::dotenv(); // load .env file if present
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
let server_process = Command::cargo_bin("delorean")?
// Can enable for debbugging

View File

@ -1,5 +1,5 @@
[package]
name = "delorean_tsm"
name = "tsm"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
@ -16,4 +16,4 @@ snafu = "0.6.2"
hex = "0.4.2"
libflate = "1.0.0"
rand = "0.7.2"
delorean_test_helpers = { path = "../delorean_test_helpers" }
test_helpers = { path = "../test_helpers" }

View File

@ -510,7 +510,7 @@ fn decode_with_sentinel(
#[allow(clippy::unreadable_literal)]
#[allow(clippy::excessive_precision)] // TODO: Audit test values for truncation
mod tests {
use delorean_test_helpers::approximately_equal;
use test_helpers::approximately_equal;
#[test]
fn encode_no_values() {

View File

@ -13,7 +13,7 @@ use std::u64;
/// Iterating over the TSM index.
///
/// ```
/// # use delorean_tsm::reader::*;
/// # use tsm::reader::*;
/// # use libflate::gzip;
/// # use std::fs::File;
/// # use std::io::BufReader;

View File

@ -20,4 +20,4 @@ tokio = { version = "0.2", features = ["full"] }
[dev-dependencies]
delorean_test_helpers = { path = "../delorean_test_helpers" }
test_helpers = { path = "../test_helpers" }

View File

@ -682,7 +682,7 @@ mod tests {
#[test]
fn sequence_numbers_are_persisted() -> Result {
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
let builder = WalBuilder::new(dir.as_ref());
let mut wal;
@ -709,7 +709,7 @@ mod tests {
#[test]
fn sequence_numbers_increase_by_number_of_pending_entries() -> Result {
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
let builder = WalBuilder::new(dir.as_ref());
let mut wal = builder.wal()?;

View File

@ -7,7 +7,7 @@ use crate::helpers::*;
#[test]
#[allow(clippy::cognitive_complexity)]
fn delete_up_to() -> Result {
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test interaction with file rollover
let builder = WalBuilder::new(dir.as_ref()).file_rollover_size(100);

View File

@ -8,7 +8,7 @@ use crate::helpers::*;
#[test]
#[allow(clippy::cognitive_complexity)]
fn file_rollover() -> Result {
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test rollover
let builder = WalBuilder::new(dir.as_ref()).file_rollover_size(100);

View File

@ -5,7 +5,7 @@ type Result<T = (), E = TestError> = std::result::Result<T, E>;
#[test]
fn no_concurrency() -> Result {
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
let builder = WalBuilder::new(dir.as_ref());
let mut wal = builder.clone().wal()?;

View File

@ -8,7 +8,7 @@ use helpers::Result;
#[test]
#[allow(clippy::cognitive_complexity)]
fn total_size() -> Result {
let dir = delorean_test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test how rollover interacts with total size
let builder = WalBuilder::new(dir.as_ref()).file_rollover_size(100);

View File

@ -1,18 +1,18 @@
[package]
name = "delorean_write_buffer"
name = "write_buffer"
version = "0.1.0"
authors = ["alamb <andrew@nerdnetworks.org>"]
edition = "2018"
[dependencies]
delorean_arrow = { path = "../delorean_arrow" }
arrow_deps = { path = "../arrow_deps" }
data_types = { path = "../data_types" }
delorean_generated_types = { path = "../delorean_generated_types" }
delorean_line_parser = { path = "../delorean_line_parser" }
delorean_storage = { path = "../delorean_storage" }
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
storage = { path = "../storage" }
wal = { path = "../wal" }
delorean_test_helpers = { path = "../delorean_test_helpers" }
test_helpers = { path = "../test_helpers" }
async-trait = "0.1"
chrono = "0.4"
@ -24,7 +24,7 @@ tokio = { version = "0.2", features = ["full"] }
tracing = "0.1"
[dev-dependencies]
delorean_test_helpers = { path = "../delorean_test_helpers" }
test_helpers = { path = "../test_helpers" }
criterion = "0.3"
[[bench]]

View File

@ -1,8 +1,8 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use delorean_line_parser as line_parser;
use delorean_storage::Database;
use delorean_write_buffer::{restore_partitions_from_wal, Db};
use influxdb_line_protocol as line_parser;
use storage::Database;
use wal::{Entry, WalBuilder};
use write_buffer::{restore_partitions_from_wal, Db};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -71,7 +71,7 @@ async fn generate_multiple_entry_multiple_partition() -> Result<(Vec<Entry>, usi
async fn common_create_entries(
mut f: impl FnMut(&mut dyn FnMut(String)),
) -> Result<(Vec<Entry>, usize)> {
let tmp_dir = delorean_test_helpers::tmp_dir()?;
let tmp_dir = test_helpers::tmp_dir()?;
let mut wal_dir = tmp_dir.as_ref().to_owned();
let db = Db::try_with_wal("mydb", &mut wal_dir).await?;

View File

@ -1,4 +1,4 @@
use delorean_generated_types::wal as wb;
use generated_types::wal as wb;
use snafu::Snafu;
use crate::dictionary::Dictionary;

View File

@ -1,6 +1,6 @@
use delorean_generated_types::wal as wb;
use delorean_line_parser::ParsedLine;
use delorean_storage::{
use generated_types::wal as wb;
use influxdb_line_protocol::ParsedLine;
use storage::{
exec::{
stringset::StringSet, FieldListPlan, GroupedSeriesSetPlan, GroupedSeriesSetPlans,
SeriesSetPlan, SeriesSetPlans, StringSetPlan,
@ -22,8 +22,7 @@ use std::io::ErrorKind;
use std::path::PathBuf;
use std::sync::Arc;
use data_types::data::{split_lines_into_write_entry_partitions, ReplicatedWrite};
use delorean_arrow::{
use arrow_deps::{
arrow,
arrow::{datatypes::Schema as ArrowSchema, record_batch::RecordBatch},
datafusion::logical_plan::LogicalPlan,
@ -32,6 +31,7 @@ use delorean_arrow::{
datasource::MemTable, error::DataFusionError, execution::context::ExecutionContext,
},
};
use data_types::data::{split_lines_into_write_entry_partitions, ReplicatedWrite};
use crate::dictionary::Error as DictionaryError;
use crate::partition::restore_partitions_from_wal;
@ -1086,11 +1086,12 @@ struct ArrowTable {
#[cfg(test)]
mod tests {
use super::*;
use delorean_arrow::datafusion::{
use arrow_deps::datafusion::{
logical_plan::{self, Literal},
scalar::ScalarValue,
};
use delorean_storage::{
use logical_plan::{Expr, Operator};
use storage::{
exec::fieldlist::{Field, FieldList},
exec::{
seriesset::{Error as SeriesSetError, SeriesSet},
@ -1099,15 +1100,14 @@ mod tests {
predicate::PredicateBuilder,
Database,
};
use logical_plan::{Expr, Operator};
use arrow::{
array::{Array, StringArray},
datatypes::DataType,
util::pretty::pretty_format_batches,
};
use delorean_line_parser::parse_lines;
use delorean_test_helpers::str_pair_vec_to_vec;
use influxdb_line_protocol::parse_lines;
use test_helpers::str_pair_vec_to_vec;
use tokio::sync::mpsc;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -1138,7 +1138,7 @@ mod tests {
#[tokio::test]
async fn list_table_names() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("mydb", &mut dir).await?;
@ -1166,7 +1166,7 @@ mod tests {
#[tokio::test]
async fn list_table_names_timestamps() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("mydb", &mut dir).await?;
@ -1202,7 +1202,7 @@ mod tests {
#[tokio::test]
async fn missing_tags_are_null() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("mydb", &mut dir).await?;
@ -1256,7 +1256,7 @@ mod tests {
#[tokio::test]
async fn write_data_and_recover() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let expected_cpu_table = r#"+--------+------+------+-------+-------------+------+------+---------+-----------+
| region | host | user | other | str | b | time | new_tag | new_field |
@ -1362,7 +1362,7 @@ mod tests {
#[tokio::test]
async fn recover_partial_entries() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let expected_cpu_table = r#"+--------+------+------+-------+-------------+------+------+---------+-----------+
| region | host | user | other | str | b | time | new_tag | new_field |
@ -1495,7 +1495,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn list_column_names() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\
@ -1653,7 +1653,7 @@ disk bytes=23432323i 1600136510000000000",
async fn list_column_names_predicate() -> Result {
// Demonstration test to show column names with predicate working
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\
@ -1687,7 +1687,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn list_column_values() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA temp=70.4 100\n\
@ -1843,7 +1843,7 @@ disk bytes=23432323i 1600136510000000000",
// This test checks that everything is wired together
// correctly. There are more detailed tests in table.rs that
// test the generated queries.
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let mut lp_lines = vec![
@ -1914,7 +1914,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn test_query_series_filter() -> Result {
// check the appropriate filters are applied in the datafusion plans
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_lines = vec![
@ -1962,7 +1962,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn test_query_series_pred_refers_to_column_not_in_table() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_lines = vec![
@ -2022,7 +2022,7 @@ disk bytes=23432323i 1600136510000000000",
expected = "Unsupported binary operator in expression: #state NotEq Utf8(\"MA\")"
)]
async fn test_query_series_pred_neq() {
let mut dir = delorean_test_helpers::tmp_dir().unwrap().into_path();
let mut dir = test_helpers::tmp_dir().unwrap().into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await.unwrap();
let lp_lines = vec![
@ -2047,7 +2047,7 @@ disk bytes=23432323i 1600136510000000000",
async fn test_field_columns() -> Result {
// Ensure that the database queries are hooked up correctly
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = vec![
@ -2143,7 +2143,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn test_field_columns_timestamp_predicate() -> Result {
// check the appropriate filters are applied in the datafusion plans
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = vec![

Some files were not shown because too many files have changed in this diff Show More