Merge pull request #1120 from influxdata/cn/no-more-test-results
fix: Don't return Result from test functionspull/24376/head
commit
21f3775016
|
@ -974,35 +974,28 @@ mod tests {
|
|||
|
||||
use super::*;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T = (), E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_table() -> Result {
|
||||
fn partition_key_with_table() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Table],
|
||||
};
|
||||
|
||||
let line = parse_line("cpu foo=1 10");
|
||||
assert_eq!("cpu", template.partition_key(&line, &Utc::now()).unwrap());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_int_field() -> Result {
|
||||
fn partition_key_with_int_field() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("foo".to_string())],
|
||||
};
|
||||
|
||||
let line = parse_line("cpu foo=1 10");
|
||||
assert_eq!("foo_1", template.partition_key(&line, &Utc::now()).unwrap());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_float_field() -> Result {
|
||||
fn partition_key_with_float_field() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("foo".to_string())],
|
||||
};
|
||||
|
@ -1012,12 +1005,10 @@ mod tests {
|
|||
"foo_1.1",
|
||||
template.partition_key(&line, &Utc::now()).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_string_field() -> Result {
|
||||
fn partition_key_with_string_field() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("foo".to_string())],
|
||||
};
|
||||
|
@ -1027,12 +1018,10 @@ mod tests {
|
|||
"foo_asdf",
|
||||
template.partition_key(&line, &Utc::now()).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_bool_field() -> Result {
|
||||
fn partition_key_with_bool_field() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("bar".to_string())],
|
||||
};
|
||||
|
@ -1042,12 +1031,10 @@ mod tests {
|
|||
"bar_true",
|
||||
template.partition_key(&line, &Utc::now()).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_tag_column() -> Result {
|
||||
fn partition_key_with_tag_column() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("region".to_string())],
|
||||
};
|
||||
|
@ -1057,24 +1044,20 @@ mod tests {
|
|||
"region_west",
|
||||
template.partition_key(&line, &Utc::now()).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_missing_column() -> Result {
|
||||
fn partition_key_with_missing_column() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("not_here".to_string())],
|
||||
};
|
||||
|
||||
let line = parse_line("cpu,foo=asdf bar=true 10");
|
||||
assert_eq!("", template.partition_key(&line, &Utc::now()).unwrap());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_time() -> Result {
|
||||
fn partition_key_with_time() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d %H:%M:%S".to_string())],
|
||||
};
|
||||
|
@ -1084,12 +1067,10 @@ mod tests {
|
|||
"2020-10-10 13:54:57",
|
||||
template.partition_key(&line, &Utc::now()).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_default_time() -> Result {
|
||||
fn partition_key_with_default_time() {
|
||||
let format_string = "%Y-%m-%d %H:%M:%S";
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::TimeFormat(format_string.to_string())],
|
||||
|
@ -1101,12 +1082,10 @@ mod tests {
|
|||
default_time.format(format_string).to_string(),
|
||||
template.partition_key(&line, &default_time).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partition_key_with_many_parts() -> Result {
|
||||
fn partition_key_with_many_parts() {
|
||||
let template = PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::Table,
|
||||
|
@ -1123,8 +1102,6 @@ mod tests {
|
|||
"cpu-region_west-usage_system_53.1-2020-10-10 13:54:57",
|
||||
template.partition_key(&line, &Utc::now()).unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parsed_lines(lp: &str) -> Vec<ParsedLine<'_>> {
|
||||
|
|
|
@ -33,11 +33,8 @@ mod tests {
|
|||
use super::*;
|
||||
use mockito::mock;
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
type Result<T = (), E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[tokio::test]
|
||||
async fn ready() -> Result {
|
||||
async fn ready() {
|
||||
let token = "some-token";
|
||||
|
||||
let mock_server = mock("GET", "/ready")
|
||||
|
@ -49,7 +46,5 @@ mod tests {
|
|||
let _result = client.ready().await;
|
||||
|
||||
mock_server.assert();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,11 +117,8 @@ mod tests {
|
|||
use super::*;
|
||||
use mockito::mock;
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
type Result<T = (), E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[tokio::test]
|
||||
async fn is_onboarding_allowed() -> Result {
|
||||
async fn is_onboarding_allowed() {
|
||||
let token = "some-token";
|
||||
|
||||
let mock_server = mock("GET", "/api/v2/setup")
|
||||
|
@ -133,11 +130,10 @@ mod tests {
|
|||
let _result = client.is_onboarding_allowed().await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn onboarding() -> Result {
|
||||
async fn onboarding() {
|
||||
let token = "some-token";
|
||||
let username = "some-user";
|
||||
let org = "some-org";
|
||||
|
@ -169,11 +165,10 @@ mod tests {
|
|||
.await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn post_setup_user() -> Result {
|
||||
async fn post_setup_user() {
|
||||
let token = "some-token";
|
||||
let username = "some-user";
|
||||
let org = "some-org";
|
||||
|
@ -205,11 +200,10 @@ mod tests {
|
|||
.await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn onboarding_opt() -> Result {
|
||||
async fn onboarding_opt() {
|
||||
let token = "some-token";
|
||||
let username = "some-user";
|
||||
let org = "some-org";
|
||||
|
@ -233,11 +227,10 @@ mod tests {
|
|||
.await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn post_setup_user_opt() -> Result {
|
||||
async fn post_setup_user_opt() {
|
||||
let token = "some-token";
|
||||
let username = "some-user";
|
||||
let org = "some-org";
|
||||
|
@ -261,6 +254,5 @@ mod tests {
|
|||
.await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -335,64 +335,60 @@ mod tests {
|
|||
use super::*;
|
||||
use std::str;
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
type Result<T = (), E = Error> = std::result::Result<T, E>;
|
||||
|
||||
fn assert_utf8_strings_eq(left: &[u8], right: &[u8]) -> Result {
|
||||
fn assert_utf8_strings_eq(left: &[u8], right: &[u8]) {
|
||||
assert_eq!(
|
||||
left,
|
||||
right,
|
||||
"\n\nleft string value: `{}`,\nright string value: `{}`",
|
||||
str::from_utf8(left)?,
|
||||
str::from_utf8(right)?
|
||||
str::from_utf8(left).unwrap(),
|
||||
str::from_utf8(right).unwrap(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn point_builder_allows_setting_tags_and_fields() -> Result {
|
||||
fn point_builder_allows_setting_tags_and_fields() {
|
||||
let point = DataPoint::builder("swap")
|
||||
.tag("host", "server01")
|
||||
.tag("name", "disk0")
|
||||
.field("in", 3_i64)
|
||||
.field("out", 4_i64)
|
||||
.timestamp(1)
|
||||
.build()?;
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_utf8_strings_eq(
|
||||
&point.data_point_to_vec()?,
|
||||
&point.data_point_to_vec().unwrap(),
|
||||
b"swap,host=server01,name=disk0 in=3i,out=4i 1\n".as_ref(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_tags_or_timestamp() -> Result {
|
||||
fn no_tags_or_timestamp() {
|
||||
let point = DataPoint::builder("m0")
|
||||
.field("f0", 1.0)
|
||||
.field("f1", 2_i64)
|
||||
.build()?;
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_utf8_strings_eq(&point.data_point_to_vec()?, b"m0 f0=1,f1=2i\n".as_ref())?;
|
||||
|
||||
Ok(())
|
||||
assert_utf8_strings_eq(
|
||||
&point.data_point_to_vec().unwrap(),
|
||||
b"m0 f0=1,f1=2i\n".as_ref(),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_timestamp() -> Result {
|
||||
fn no_timestamp() {
|
||||
let point = DataPoint::builder("m0")
|
||||
.tag("t0", "v0")
|
||||
.tag("t1", "v1")
|
||||
.field("f1", 2_i64)
|
||||
.build()?;
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_utf8_strings_eq(
|
||||
&point.data_point_to_vec()?,
|
||||
&point.data_point_to_vec().unwrap(),
|
||||
b"m0,t0=v0,t1=v1 f1=2i\n".as_ref(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -405,80 +401,72 @@ mod tests {
|
|||
const ALL_THE_DELIMITERS: &str = r#"alpha,beta=delta gamma"epsilon"#;
|
||||
|
||||
#[test]
|
||||
fn special_characters_are_escaped_in_measurements() -> Result {
|
||||
fn special_characters_are_escaped_in_measurements() {
|
||||
assert_utf8_strings_eq(
|
||||
&ALL_THE_DELIMITERS.measurement_to_vec()?,
|
||||
&ALL_THE_DELIMITERS.measurement_to_vec().unwrap(),
|
||||
br#"alpha\,beta=delta\ gamma"epsilon"#.as_ref(),
|
||||
)?;
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn special_characters_are_escaped_in_tag_keys() -> Result {
|
||||
fn special_characters_are_escaped_in_tag_keys() {
|
||||
assert_utf8_strings_eq(
|
||||
&ALL_THE_DELIMITERS.tag_key_to_vec()?,
|
||||
&ALL_THE_DELIMITERS.tag_key_to_vec().unwrap(),
|
||||
br#"alpha\,beta\=delta\ gamma"epsilon"#.as_ref(),
|
||||
)?;
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn special_characters_are_escaped_in_tag_values() -> Result {
|
||||
fn special_characters_are_escaped_in_tag_values() {
|
||||
assert_utf8_strings_eq(
|
||||
&ALL_THE_DELIMITERS.tag_value_to_vec()?,
|
||||
&ALL_THE_DELIMITERS.tag_value_to_vec().unwrap(),
|
||||
br#"alpha\,beta\=delta\ gamma"epsilon"#.as_ref(),
|
||||
)?;
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn special_characters_are_escaped_in_field_keys() -> Result {
|
||||
fn special_characters_are_escaped_in_field_keys() {
|
||||
assert_utf8_strings_eq(
|
||||
&ALL_THE_DELIMITERS.field_key_to_vec()?,
|
||||
&ALL_THE_DELIMITERS.field_key_to_vec().unwrap(),
|
||||
br#"alpha\,beta\=delta\ gamma"epsilon"#.as_ref(),
|
||||
)?;
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn special_characters_are_escaped_in_field_values_of_strings() -> Result {
|
||||
fn special_characters_are_escaped_in_field_values_of_strings() {
|
||||
assert_utf8_strings_eq(
|
||||
&FieldValue::from(ALL_THE_DELIMITERS).field_value_to_vec()?,
|
||||
&FieldValue::from(ALL_THE_DELIMITERS)
|
||||
.field_value_to_vec()
|
||||
.unwrap(),
|
||||
br#""alpha,beta=delta gamma\"epsilon""#.as_ref(),
|
||||
)?;
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_value_of_bool() -> Result {
|
||||
fn field_value_of_bool() {
|
||||
let e = FieldValue::from(true);
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec()?, b"t")?;
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec().unwrap(), b"t");
|
||||
|
||||
let e = FieldValue::from(false);
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec()?, b"f")?;
|
||||
|
||||
Ok(())
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec().unwrap(), b"f");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_value_of_float() -> Result {
|
||||
fn field_value_of_float() {
|
||||
let e = FieldValue::from(42_f64);
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec()?, b"42")?;
|
||||
Ok(())
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec().unwrap(), b"42");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_value_of_integer() -> Result {
|
||||
fn field_value_of_integer() {
|
||||
let e = FieldValue::from(42_i64);
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec()?, b"42i")?;
|
||||
Ok(())
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec().unwrap(), b"42i");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_value_of_string() -> Result {
|
||||
fn field_value_of_string() {
|
||||
let e = FieldValue::from("hello");
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec()?, br#""hello""#)?;
|
||||
Ok(())
|
||||
assert_utf8_strings_eq(&e.field_value_to_vec().unwrap(), br#""hello""#);
|
||||
}
|
||||
|
||||
// Clears up the boilerplate of writing to a vector from the tests
|
||||
|
|
|
@ -230,11 +230,8 @@ mod tests {
|
|||
use futures::stream;
|
||||
use mockito::mock;
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
type Result<T = (), E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[tokio::test]
|
||||
async fn writing_points() -> Result {
|
||||
async fn writing_points() {
|
||||
let org = "some-org";
|
||||
let bucket = "some-bucket";
|
||||
let token = "some-token";
|
||||
|
@ -258,12 +255,14 @@ cpu,host=server01,region=us-west usage=0.87
|
|||
DataPoint::builder("cpu")
|
||||
.tag("host", "server01")
|
||||
.field("usage", 0.5)
|
||||
.build()?,
|
||||
.build()
|
||||
.unwrap(),
|
||||
DataPoint::builder("cpu")
|
||||
.tag("host", "server01")
|
||||
.tag("region", "us-west")
|
||||
.field("usage", 0.87)
|
||||
.build()?,
|
||||
.build()
|
||||
.unwrap(),
|
||||
];
|
||||
|
||||
// If the requests made are incorrect, Mockito returns status 501 and `write`
|
||||
|
@ -274,11 +273,10 @@ cpu,host=server01,region=us-west usage=0.87
|
|||
let _result = client.write(org, bucket, stream::iter(points)).await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_bucket() -> Result {
|
||||
async fn create_bucket() {
|
||||
let org_id = "0000111100001111";
|
||||
let bucket = "some-bucket";
|
||||
let token = "some-token";
|
||||
|
@ -299,7 +297,6 @@ cpu,host=server01,region=us-west usage=0.87
|
|||
let _result = client.create_bucket(org_id, bucket).await;
|
||||
|
||||
mock_server.assert();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -747,33 +747,37 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn conflicting_field_types() -> Result<(), TSMError> {
|
||||
fn conflicting_field_types() {
|
||||
let mut table = MeasurementTable::new("cpu".to_string(), 0);
|
||||
table.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
max_time: 0,
|
||||
min_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
max_time: 0,
|
||||
min_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
table.add_series_data(
|
||||
vec![],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
max_time: 0,
|
||||
min_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Integer,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
max_time: 0,
|
||||
min_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Integer,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// The block type for the value field should be Float because the
|
||||
// conflicting integer field should be ignored.
|
||||
|
@ -781,50 +785,55 @@ mod tests {
|
|||
*table.field_columns().get("value").unwrap(),
|
||||
BlockType::Float,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_measurement_table() -> Result<(), TSMError> {
|
||||
fn merge_measurement_table() {
|
||||
let mut table1 = MeasurementTable::new("cpu".to_string(), 0);
|
||||
table1.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
min_time: 101,
|
||||
max_time: 150,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table1
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
min_time: 101,
|
||||
max_time: 150,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut table2 = MeasurementTable::new("cpu".to_string(), 1);
|
||||
table2.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
min_time: 0,
|
||||
max_time: 100,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table2.add_series_data(
|
||||
vec![("server".to_string(), "a".to_string())],
|
||||
"temp".to_string(),
|
||||
Block {
|
||||
min_time: 0,
|
||||
max_time: 50,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Str,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table2
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"value".to_string(),
|
||||
Block {
|
||||
min_time: 0,
|
||||
max_time: 100,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
table2
|
||||
.add_series_data(
|
||||
vec![("server".to_string(), "a".to_string())],
|
||||
"temp".to_string(),
|
||||
Block {
|
||||
min_time: 0,
|
||||
max_time: 50,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Str,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
table1.merge(&mut table2).unwrap();
|
||||
assert_eq!(table1.name, "cpu");
|
||||
|
@ -882,8 +891,6 @@ mod tests {
|
|||
field_blocks_temp,
|
||||
);
|
||||
assert_eq!(table1.tag_set_fields_blocks, exp_tag_set_field_blocks);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1439,7 +1439,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn measurement_writer_buffering() -> Result<(), Error> {
|
||||
fn measurement_writer_buffering() {
|
||||
let log = Arc::new(Mutex::new(WriterLog::new()));
|
||||
let table_writer = Box::new(NoOpWriter::new(Arc::clone(&log), String::from("cpu")));
|
||||
|
||||
|
@ -1465,20 +1465,26 @@ mod tests {
|
|||
assert_eq!(get_events(&log).len(), 0);
|
||||
|
||||
// buffer size is 2 we don't expect any writes until three rows are pushed
|
||||
writer.buffer_line(parsed_lines.next().expect("parse success"))?;
|
||||
writer
|
||||
.buffer_line(parsed_lines.next().expect("parse success"))
|
||||
.unwrap();
|
||||
assert_eq!(get_events(&log).len(), 0);
|
||||
writer.buffer_line(parsed_lines.next().expect("parse success"))?;
|
||||
writer
|
||||
.buffer_line(parsed_lines.next().expect("parse success"))
|
||||
.unwrap();
|
||||
assert_eq!(get_events(&log).len(), 0);
|
||||
|
||||
// this should cause a flush and write
|
||||
writer.buffer_line(parsed_lines.next().expect("parse success"))?;
|
||||
writer
|
||||
.buffer_line(parsed_lines.next().expect("parse success"))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
get_events(&log),
|
||||
vec!["[cpu] Wrote batch of 2 cols, 2 rows"]
|
||||
);
|
||||
|
||||
// finalize should write out the last line
|
||||
writer.finalize()?;
|
||||
writer.finalize().unwrap();
|
||||
assert_eq!(
|
||||
get_events(&log),
|
||||
vec![
|
||||
|
@ -1487,8 +1493,6 @@ mod tests {
|
|||
"[cpu] Closed",
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ----- Tests for pack_data -----
|
||||
|
@ -1521,8 +1525,11 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn pack_data_schema() -> Result<(), Error> {
|
||||
let schema = parse_data_into_sampler()?.deduce_schema_from_sample()?;
|
||||
fn pack_data_schema() {
|
||||
let schema = parse_data_into_sampler()
|
||||
.unwrap()
|
||||
.deduce_schema_from_sample()
|
||||
.unwrap();
|
||||
|
||||
// Then the correct schema is extracted
|
||||
println!("Converted to {:#?}", schema);
|
||||
|
@ -1553,14 +1560,12 @@ mod tests {
|
|||
"bool_field"
|
||||
);
|
||||
assert_column_eq!(schema, 5, InfluxColumnType::Timestamp, "time");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pack_data_value() -> Result<(), Error> {
|
||||
let mut sampler = parse_data_into_sampler()?;
|
||||
let schema = sampler.deduce_schema_from_sample()?;
|
||||
fn pack_data_value() {
|
||||
let mut sampler = parse_data_into_sampler().unwrap();
|
||||
let schema = sampler.deduce_schema_from_sample().unwrap();
|
||||
|
||||
let packers = pack_lines(&schema, &sampler.schema_sample);
|
||||
|
||||
|
@ -1667,8 +1672,6 @@ mod tests {
|
|||
assert_eq!(timestamp_packer.get(6).unwrap(), &1_590_488_773_254_480_000);
|
||||
assert!(timestamp_packer.is_null(7));
|
||||
assert_eq!(timestamp_packer.get(8).unwrap(), &1_590_488_773_254_490_000);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ----- Tests for LineProtocolConverter -----
|
||||
|
@ -1692,7 +1695,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn conversion_with_multiple_measurements() -> Result<(), Error> {
|
||||
fn conversion_with_multiple_measurements() {
|
||||
// These lines have interleaved measurements to force the
|
||||
// state machine in LineProtocolConverter::convert through all
|
||||
// the branches
|
||||
|
@ -1737,14 +1740,12 @@ mod tests {
|
|||
"[h2o_temperature] Closed",
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ----- Tests for TSM Data -----
|
||||
|
||||
#[test]
|
||||
fn process_measurement_table() -> Result<(), Box<dyn std::error::Error>> {
|
||||
fn process_measurement_table() {
|
||||
// Input data - in line protocol format
|
||||
//
|
||||
// cpu,region=east temp=1.2 0
|
||||
|
@ -1782,63 +1783,71 @@ mod tests {
|
|||
|
||||
let mut table = MeasurementTable::new("cpu".to_string(), 0);
|
||||
// cpu region=east temp=<all the block data for this key>
|
||||
table.add_series_data(
|
||||
vec![("region".to_string(), "east".to_string())],
|
||||
"temp".to_string(),
|
||||
Block {
|
||||
min_time: 0,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "east".to_string())],
|
||||
"temp".to_string(),
|
||||
Block {
|
||||
min_time: 0,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// cpu region=east voltage=<all the block data for this key>
|
||||
table.add_series_data(
|
||||
vec![("region".to_string(), "east".to_string())],
|
||||
"voltage".to_string(),
|
||||
Block {
|
||||
min_time: 1,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "east".to_string())],
|
||||
"voltage".to_string(),
|
||||
Block {
|
||||
min_time: 1,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// cpu region=west,server=a temp=<all the block data for this key>
|
||||
table.add_series_data(
|
||||
vec![
|
||||
("region".to_string(), "west".to_string()),
|
||||
("server".to_string(), "a".to_string()),
|
||||
],
|
||||
"temp".to_string(),
|
||||
Block {
|
||||
min_time: 2,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![
|
||||
("region".to_string(), "west".to_string()),
|
||||
("server".to_string(), "a".to_string()),
|
||||
],
|
||||
"temp".to_string(),
|
||||
Block {
|
||||
min_time: 2,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Float,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// cpu az=b watts=<all the block data for this key>
|
||||
table.add_series_data(
|
||||
vec![("az".to_string(), "b".to_string())],
|
||||
"watts".to_string(),
|
||||
Block {
|
||||
min_time: 3,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Unsigned,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("az".to_string(), "b".to_string())],
|
||||
"watts".to_string(),
|
||||
Block {
|
||||
min_time: 3,
|
||||
max_time: 0,
|
||||
offset: 0,
|
||||
size: 0,
|
||||
typ: BlockType::Unsigned,
|
||||
reader_idx: 0,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut block_map = BTreeMap::new();
|
||||
block_map.insert(
|
||||
|
@ -1875,7 +1884,8 @@ mod tests {
|
|||
);
|
||||
|
||||
let decoder = MockBlockDecoder::new(block_map);
|
||||
let (schema, packers) = TSMFileConverter::process_measurement_table(decoder, &mut table)?;
|
||||
let (schema, packers) =
|
||||
TSMFileConverter::process_measurement_table(decoder, &mut table).unwrap();
|
||||
|
||||
assert_column_eq!(schema, 0, InfluxColumnType::Tag, "az");
|
||||
assert_column_eq!(schema, 1, InfluxColumnType::Tag, "region");
|
||||
|
@ -2005,8 +2015,6 @@ mod tests {
|
|||
Some(4000),
|
||||
]))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn empty_block() -> Block {
|
||||
|
@ -2021,33 +2029,39 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn merge_input_tables() -> Result<(), Box<dyn std::error::Error>> {
|
||||
fn merge_input_tables() {
|
||||
let mut inputs = vec![];
|
||||
let mut table = MeasurementTable::new("cpu".to_string(), 0);
|
||||
table.add_series_data(
|
||||
vec![("region".to_string(), "east".to_string())],
|
||||
"temp".to_string(),
|
||||
empty_block(),
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "east".to_string())],
|
||||
"temp".to_string(),
|
||||
empty_block(),
|
||||
)
|
||||
.unwrap();
|
||||
inputs.push(Some(table.clone()));
|
||||
|
||||
table = MeasurementTable::new("cpu".to_string(), 1);
|
||||
table.add_series_data(
|
||||
vec![("server".to_string(), "a".to_string())],
|
||||
"temp".to_string(),
|
||||
empty_block(),
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("server".to_string(), "a".to_string())],
|
||||
"temp".to_string(),
|
||||
empty_block(),
|
||||
)
|
||||
.unwrap();
|
||||
inputs.push(Some(table.clone()));
|
||||
|
||||
table = MeasurementTable::new("disk".to_string(), 2);
|
||||
table.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"temp".to_string(),
|
||||
empty_block(),
|
||||
)?;
|
||||
table
|
||||
.add_series_data(
|
||||
vec![("region".to_string(), "west".to_string())],
|
||||
"temp".to_string(),
|
||||
empty_block(),
|
||||
)
|
||||
.unwrap();
|
||||
inputs.push(Some(table));
|
||||
|
||||
let mut res = TSMFileConverter::merge_input_tables(inputs)?;
|
||||
let mut res = TSMFileConverter::merge_input_tables(inputs).unwrap();
|
||||
let mut merged = res.0.unwrap();
|
||||
inputs = res.1;
|
||||
assert_eq!(merged.name, "cpu".to_string());
|
||||
|
@ -2055,15 +2069,14 @@ mod tests {
|
|||
assert_eq!(inputs[0], None);
|
||||
assert_eq!(inputs[1], None);
|
||||
|
||||
res = TSMFileConverter::merge_input_tables(inputs)?;
|
||||
res = TSMFileConverter::merge_input_tables(inputs).unwrap();
|
||||
merged = res.0.unwrap();
|
||||
assert_eq!(merged.name, "disk".to_string());
|
||||
assert_eq!(res.1, vec![None, None, None]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conversion_tsm_file_single() -> Result<(), Error> {
|
||||
fn conversion_tsm_file_single() {
|
||||
let file = File::open("../tests/fixtures/merge-tsm/merge_a.tsm.gz");
|
||||
let mut decoder = GzDecoder::new(file.unwrap());
|
||||
let mut buf = Vec::new();
|
||||
|
@ -2099,12 +2112,10 @@ mod tests {
|
|||
"[disk] Closed"
|
||||
],
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conversion_tsm_files_none_overlapping() -> Result<(), Error> {
|
||||
fn conversion_tsm_files_none_overlapping() {
|
||||
let mut index_streams = Vec::new();
|
||||
let mut block_streams = Vec::new();
|
||||
|
||||
|
@ -2154,7 +2165,5 @@ mod tests {
|
|||
"[disk] Closed"
|
||||
],
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -339,32 +339,27 @@ impl Column {
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T = (), E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
#[test]
|
||||
fn test_has_i64_range() -> Result {
|
||||
fn test_has_i64_range() {
|
||||
let mut stats = StatValues::new(1);
|
||||
stats.update(2);
|
||||
let col = Column::I64(vec![Some(1), None, Some(2)], stats.clone());
|
||||
assert!(!col.has_i64_range(-1, 0)?);
|
||||
assert!(!col.has_i64_range(0, 1)?);
|
||||
assert!(col.has_i64_range(1, 2)?);
|
||||
assert!(col.has_i64_range(2, 3)?);
|
||||
assert!(!col.has_i64_range(3, 4)?);
|
||||
assert!(!col.has_i64_range(-1, 0).unwrap());
|
||||
assert!(!col.has_i64_range(0, 1).unwrap());
|
||||
assert!(col.has_i64_range(1, 2).unwrap());
|
||||
assert!(col.has_i64_range(2, 3).unwrap());
|
||||
assert!(!col.has_i64_range(3, 4).unwrap());
|
||||
|
||||
let col = Column::I64(vec![Some(2), None, Some(1)], stats);
|
||||
assert!(!col.has_i64_range(-1, 0)?);
|
||||
assert!(!col.has_i64_range(0, 1)?);
|
||||
assert!(col.has_i64_range(1, 2)?);
|
||||
assert!(col.has_i64_range(2, 3)?);
|
||||
assert!(!col.has_i64_range(3, 4)?);
|
||||
|
||||
Ok(())
|
||||
assert!(!col.has_i64_range(-1, 0).unwrap());
|
||||
assert!(!col.has_i64_range(0, 1).unwrap());
|
||||
assert!(col.has_i64_range(1, 2).unwrap());
|
||||
assert!(col.has_i64_range(2, 3).unwrap());
|
||||
assert!(!col.has_i64_range(3, 4).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_has_i64_range_does_not_panic() -> Result {
|
||||
fn test_has_i64_range_does_not_panic() {
|
||||
// providing the wrong column type should get an internal error, not a panic
|
||||
let col = Column::F64(vec![Some(1.2)], StatValues::new(1.2));
|
||||
let res = col.has_i64_range(-1, 0);
|
||||
|
@ -377,11 +372,10 @@ mod tests {
|
|||
expected,
|
||||
res_string
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_has_non_null_i64_range_() -> Result {
|
||||
fn test_has_non_null_i64_range_() {
|
||||
let none_col: Vec<Option<u32>> = vec![None, None, None];
|
||||
let some_col: Vec<Option<u32>> = vec![Some(0), Some(0), Some(0)];
|
||||
|
||||
|
@ -389,19 +383,17 @@ mod tests {
|
|||
stats.update(2);
|
||||
let col = Column::I64(vec![Some(1), None, Some(2)], stats);
|
||||
|
||||
assert!(!col.has_non_null_i64_range(&some_col, -1, 0)?);
|
||||
assert!(!col.has_non_null_i64_range(&some_col, 0, 1)?);
|
||||
assert!(col.has_non_null_i64_range(&some_col, 1, 2)?);
|
||||
assert!(col.has_non_null_i64_range(&some_col, 2, 3)?);
|
||||
assert!(!col.has_non_null_i64_range(&some_col, 3, 4)?);
|
||||
assert!(!col.has_non_null_i64_range(&some_col, -1, 0).unwrap());
|
||||
assert!(!col.has_non_null_i64_range(&some_col, 0, 1).unwrap());
|
||||
assert!(col.has_non_null_i64_range(&some_col, 1, 2).unwrap());
|
||||
assert!(col.has_non_null_i64_range(&some_col, 2, 3).unwrap());
|
||||
assert!(!col.has_non_null_i64_range(&some_col, 3, 4).unwrap());
|
||||
|
||||
assert!(!col.has_non_null_i64_range(&none_col, -1, 0)?);
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 0, 1)?);
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 1, 2)?);
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 2, 3)?);
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 3, 4)?);
|
||||
|
||||
Ok(())
|
||||
assert!(!col.has_non_null_i64_range(&none_col, -1, 0).unwrap());
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 0, 1).unwrap());
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 1, 2).unwrap());
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 2, 3).unwrap());
|
||||
assert!(!col.has_non_null_i64_range(&none_col, 3, 4).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -482,7 +482,7 @@ mod tests {
|
|||
{} to run",
|
||||
unset_var_names
|
||||
);
|
||||
return Ok(());
|
||||
return;
|
||||
} else {
|
||||
AwsConfig {
|
||||
access_key_id: env::var("AWS_ACCESS_KEY_ID")
|
||||
|
@ -515,7 +515,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test() -> Result<()> {
|
||||
async fn s3_test() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_amazon_s3(
|
||||
AmazonS3::new(
|
||||
|
@ -527,14 +527,12 @@ mod tests {
|
|||
.expect("Valid S3 config"),
|
||||
);
|
||||
|
||||
check_credentials(put_get_delete_list(&integration).await)?;
|
||||
check_credentials(put_get_delete_list(&integration).await).unwrap();
|
||||
check_credentials(list_with_delimiter(&integration).await).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_get_nonexistent_region() -> Result<()> {
|
||||
async fn s3_test_get_nonexistent_region() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
// Assumes environment variables do not provide credentials to AWS US West 1
|
||||
config.region = "us-west-1".into();
|
||||
|
@ -564,12 +562,10 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_get_nonexistent_location() -> Result<()> {
|
||||
async fn s3_test_get_nonexistent_location() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_amazon_s3(
|
||||
AmazonS3::new(
|
||||
|
@ -605,12 +601,10 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_get_nonexistent_bucket() -> Result<()> {
|
||||
async fn s3_test_get_nonexistent_bucket() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
config.bucket = NON_EXISTENT_NAME.into();
|
||||
|
||||
|
@ -642,12 +636,10 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_put_nonexistent_region() -> Result<()> {
|
||||
async fn s3_test_put_nonexistent_region() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
// Assumes environment variables do not provide credentials to AWS US West 1
|
||||
config.region = "us-west-1".into();
|
||||
|
@ -691,12 +683,10 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_put_nonexistent_bucket() -> Result<()> {
|
||||
async fn s3_test_put_nonexistent_bucket() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
config.bucket = NON_EXISTENT_NAME.into();
|
||||
|
||||
|
@ -739,12 +729,10 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_delete_nonexistent_location() -> Result<()> {
|
||||
async fn s3_test_delete_nonexistent_location() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_amazon_s3(
|
||||
AmazonS3::new(
|
||||
|
@ -762,12 +750,10 @@ mod tests {
|
|||
let result = integration.delete(&location).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_delete_nonexistent_region() -> Result<()> {
|
||||
async fn s3_test_delete_nonexistent_region() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
// Assumes environment variables do not provide credentials to AWS US West 1
|
||||
config.region = "us-west-1".into();
|
||||
|
@ -801,12 +787,10 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn s3_test_delete_nonexistent_bucket() -> Result<()> {
|
||||
async fn s3_test_delete_nonexistent_bucket() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
config.bucket = NON_EXISTENT_NAME.into();
|
||||
|
||||
|
@ -839,7 +823,5 @@ mod tests {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -280,9 +280,6 @@ mod tests {
|
|||
use crate::ObjectStore;
|
||||
use std::env;
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AzureConfig {
|
||||
storage_account: String,
|
||||
|
@ -324,7 +321,7 @@ mod tests {
|
|||
{} to run",
|
||||
unset_var_names
|
||||
);
|
||||
return Ok(());
|
||||
return;
|
||||
} else {
|
||||
AzureConfig {
|
||||
storage_account: env::var("AZURE_STORAGE_ACCOUNT")
|
||||
|
@ -339,7 +336,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn azure_blob_test() -> Result<()> {
|
||||
async fn azure_blob_test() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_microsoft_azure(MicrosoftAzure::new(
|
||||
config.storage_account,
|
||||
|
@ -347,9 +344,7 @@ mod tests {
|
|||
config.bucket,
|
||||
));
|
||||
|
||||
put_get_delete_list(&integration).await?;
|
||||
list_with_delimiter(&integration).await?;
|
||||
|
||||
Ok(())
|
||||
put_get_delete_list(&integration).await.unwrap();
|
||||
list_with_delimiter(&integration).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ mod tests {
|
|||
.interleave_pending()
|
||||
}
|
||||
|
||||
async fn check_stream<R>(buf_stream: BufferedStream<R>) -> Result<()>
|
||||
async fn check_stream<R>(buf_stream: BufferedStream<R>)
|
||||
where
|
||||
R: AsyncRead + AsyncWrite + AsyncSeek + Unpin,
|
||||
{
|
||||
|
@ -106,25 +106,30 @@ mod tests {
|
|||
let content = buf_stream
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(content, "foobarbaz");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_buffered_stream() -> Result<()> {
|
||||
async fn test_buffered_stream() {
|
||||
let backing_store = std::io::Cursor::new(Vec::new()); // in-memory buffer
|
||||
check_stream(BufferedStream::new(backing_store, test_data()).await?).await
|
||||
check_stream(
|
||||
BufferedStream::new(backing_store, test_data())
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_slurp_stream_tempfile() -> Result<()> {
|
||||
check_stream(slurp_stream_tempfile(test_data()).await?).await
|
||||
async fn test_slurp_stream_tempfile() {
|
||||
check_stream(slurp_stream_tempfile(test_data()).await.unwrap()).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_slurp_stream_memory() -> Result<()> {
|
||||
check_stream(slurp_stream_memory(test_data()).await?).await
|
||||
async fn test_slurp_stream_memory() {
|
||||
check_stream(slurp_stream_memory(test_data()).await.unwrap()).await;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -263,9 +263,6 @@ impl File {
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
use crate::{
|
||||
tests::{list_with_delimiter, put_get_delete_list},
|
||||
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
|
||||
|
@ -274,19 +271,17 @@ mod tests {
|
|||
use tempfile::TempDir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn file_test() -> Result<()> {
|
||||
let root = TempDir::new()?;
|
||||
async fn file_test() {
|
||||
let root = TempDir::new().unwrap();
|
||||
let integration = ObjectStore::new_file(File::new(root.path()));
|
||||
|
||||
put_get_delete_list(&integration).await?;
|
||||
list_with_delimiter(&integration).await?;
|
||||
|
||||
Ok(())
|
||||
put_get_delete_list(&integration).await.unwrap();
|
||||
list_with_delimiter(&integration).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn length_mismatch_is_an_error() -> Result<()> {
|
||||
let root = TempDir::new()?;
|
||||
async fn length_mismatch_is_an_error() {
|
||||
let root = TempDir::new().unwrap();
|
||||
let integration = ObjectStore::new_file(File::new(root.path()));
|
||||
|
||||
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
|
||||
|
@ -303,13 +298,11 @@ mod tests {
|
|||
}
|
||||
}
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn creates_dir_if_not_present() -> Result<()> {
|
||||
let root = TempDir::new()?;
|
||||
async fn creates_dir_if_not_present() {
|
||||
let root = TempDir::new().unwrap();
|
||||
let integration = ObjectStore::new_file(File::new(root.path()));
|
||||
|
||||
let data = Bytes::from("arbitrary data");
|
||||
|
@ -323,22 +316,23 @@ mod tests {
|
|||
futures::stream::once(async move { stream_data }),
|
||||
Some(data.len()),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let read_data = integration
|
||||
.get(&location)
|
||||
.await?
|
||||
.await
|
||||
.unwrap()
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(&*read_data, data);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_length() -> Result<()> {
|
||||
let root = TempDir::new()?;
|
||||
async fn unknown_length() {
|
||||
let root = TempDir::new().unwrap();
|
||||
let integration = ObjectStore::new_file(File::new(root.path()));
|
||||
|
||||
let data = Bytes::from("arbitrary data");
|
||||
|
@ -352,16 +346,17 @@ mod tests {
|
|||
futures::stream::once(async move { stream_data }),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let read_data = integration
|
||||
.get(&location)
|
||||
.await?
|
||||
.await
|
||||
.unwrap()
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(&*read_data, data);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -264,9 +264,6 @@ mod test {
|
|||
use bytes::Bytes;
|
||||
use std::env;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
const NON_EXISTENT_NAME: &str = "nonexistentname";
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -305,7 +302,7 @@ mod test {
|
|||
{} to run",
|
||||
unset_var_names
|
||||
);
|
||||
return Ok(());
|
||||
return;
|
||||
} else {
|
||||
GoogleCloudConfig {
|
||||
bucket: env::var("INFLUXDB_IOX_BUCKET")
|
||||
|
@ -318,20 +315,19 @@ mod test {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gcs_test() -> Result<()> {
|
||||
async fn gcs_test() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
|
||||
config.service_account,
|
||||
config.bucket,
|
||||
));
|
||||
|
||||
put_get_delete_list(&integration).await?;
|
||||
list_with_delimiter(&integration).await?;
|
||||
Ok(())
|
||||
put_get_delete_list(&integration).await.unwrap();
|
||||
list_with_delimiter(&integration).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gcs_test_get_nonexistent_location() -> Result<()> {
|
||||
async fn gcs_test_get_nonexistent_location() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
|
||||
config.service_account,
|
||||
|
@ -360,12 +356,10 @@ mod test {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gcs_test_get_nonexistent_bucket() -> Result<()> {
|
||||
async fn gcs_test_get_nonexistent_bucket() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
config.bucket = NON_EXISTENT_NAME.into();
|
||||
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
|
||||
|
@ -389,12 +383,10 @@ mod test {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gcs_test_delete_nonexistent_location() -> Result<()> {
|
||||
async fn gcs_test_delete_nonexistent_location() {
|
||||
let config = maybe_skip_integration!();
|
||||
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
|
||||
config.service_account,
|
||||
|
@ -421,12 +413,10 @@ mod test {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gcs_test_delete_nonexistent_bucket() -> Result<()> {
|
||||
async fn gcs_test_delete_nonexistent_bucket() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
config.bucket = NON_EXISTENT_NAME.into();
|
||||
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
|
||||
|
@ -454,12 +444,10 @@ mod test {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gcs_test_put_nonexistent_bucket() -> Result<()> {
|
||||
async fn gcs_test_put_nonexistent_bucket() {
|
||||
let mut config = maybe_skip_integration!();
|
||||
config.bucket = NON_EXISTENT_NAME.into();
|
||||
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
|
||||
|
@ -497,7 +485,5 @@ mod test {
|
|||
} else {
|
||||
panic!("unexpected error type: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -176,9 +176,6 @@ impl InMemory {
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
use crate::{
|
||||
tests::{list_with_delimiter, put_get_delete_list},
|
||||
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
|
||||
|
@ -186,18 +183,15 @@ mod tests {
|
|||
use futures::stream;
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_memory_test() -> Result<()> {
|
||||
async fn in_memory_test() {
|
||||
let integration = ObjectStore::new_in_memory(InMemory::new());
|
||||
|
||||
put_get_delete_list(&integration).await?;
|
||||
|
||||
put_get_delete_list(&integration).await.unwrap();
|
||||
list_with_delimiter(&integration).await.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn length_mismatch_is_an_error() -> Result<()> {
|
||||
async fn length_mismatch_is_an_error() {
|
||||
let integration = ObjectStore::new_in_memory(InMemory::new());
|
||||
|
||||
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
|
||||
|
@ -214,12 +208,10 @@ mod tests {
|
|||
}
|
||||
}
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_length() -> Result<()> {
|
||||
async fn unknown_length() {
|
||||
let integration = ObjectStore::new_in_memory(InMemory::new());
|
||||
|
||||
let data = Bytes::from("arbitrary data");
|
||||
|
@ -233,16 +225,17 @@ mod tests {
|
|||
futures::stream::once(async move { stream_data }),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let read_data = integration
|
||||
.get(&location)
|
||||
.await?
|
||||
.await
|
||||
.unwrap()
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(&*read_data, data);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -323,33 +323,30 @@ mod tests {
|
|||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_known_string_set_plan_ok() -> Result<()> {
|
||||
async fn executor_known_string_set_plan_ok() {
|
||||
let expected_strings = to_set(&["Foo", "Bar"]);
|
||||
let plan = StringSetPlan::Known(Arc::clone(&expected_strings));
|
||||
|
||||
let executor = Executor::default();
|
||||
let result_strings = executor.to_string_set(plan).await?;
|
||||
let result_strings = executor.to_string_set(plan).await.unwrap();
|
||||
assert_eq!(result_strings, expected_strings);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_datafusion_string_set_single_plan_no_batches() -> Result<()> {
|
||||
async fn executor_datafusion_string_set_single_plan_no_batches() {
|
||||
// Test with a single plan that produces no batches
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
|
||||
let scan = make_plan(schema, vec![]);
|
||||
let plan: StringSetPlan = vec![scan].into();
|
||||
|
||||
let executor = Executor::new();
|
||||
let results = executor.to_string_set(plan).await?;
|
||||
let results = executor.to_string_set(plan).await.unwrap();
|
||||
|
||||
assert_eq!(results, StringSetRef::new(StringSet::new()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_datafusion_string_set_single_plan_one_batch() -> Result<()> {
|
||||
async fn executor_datafusion_string_set_single_plan_one_batch() {
|
||||
// Test with a single plan that produces one record batch
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
|
||||
let data = to_string_array(&["foo", "bar", "baz", "foo"]);
|
||||
|
@ -359,15 +356,13 @@ mod tests {
|
|||
let plan: StringSetPlan = vec![scan].into();
|
||||
|
||||
let executor = Executor::new();
|
||||
let results = executor.to_string_set(plan).await?;
|
||||
let results = executor.to_string_set(plan).await.unwrap();
|
||||
|
||||
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_datafusion_string_set_single_plan_two_batch() -> Result<()> {
|
||||
async fn executor_datafusion_string_set_single_plan_two_batch() {
|
||||
// Test with a single plan that produces multiple record batches
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
|
||||
let data1 = to_string_array(&["foo", "bar"]);
|
||||
|
@ -380,15 +375,13 @@ mod tests {
|
|||
let plan: StringSetPlan = vec![scan].into();
|
||||
|
||||
let executor = Executor::new();
|
||||
let results = executor.to_string_set(plan).await?;
|
||||
let results = executor.to_string_set(plan).await.unwrap();
|
||||
|
||||
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_datafusion_string_set_multi_plan() -> Result<()> {
|
||||
async fn executor_datafusion_string_set_multi_plan() {
|
||||
// Test with multiple datafusion logical plans
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
|
||||
|
||||
|
@ -405,15 +398,13 @@ mod tests {
|
|||
let plan: StringSetPlan = vec![scan1, scan2].into();
|
||||
|
||||
let executor = Executor::new();
|
||||
let results = executor.to_string_set(plan).await?;
|
||||
let results = executor.to_string_set(plan).await.unwrap();
|
||||
|
||||
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_datafusion_string_set_nulls() -> Result<()> {
|
||||
async fn executor_datafusion_string_set_nulls() {
|
||||
// Ensure that nulls in the output set are handled reasonably
|
||||
// (error, rather than silently ignored)
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
|
||||
|
@ -440,12 +431,10 @@ mod tests {
|
|||
expected_error,
|
||||
actual_error,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_datafusion_string_set_bad_schema() -> Result<()> {
|
||||
async fn executor_datafusion_string_set_bad_schema() {
|
||||
// Ensure that an incorect schema (an int) gives a reasonable error
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
|
||||
let data = Arc::new(Int64Array::from(vec![1]));
|
||||
|
@ -469,12 +458,10 @@ mod tests {
|
|||
expected_error,
|
||||
actual_error
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn make_schema_pivot_is_planned() -> Result<()> {
|
||||
async fn make_schema_pivot_is_planned() {
|
||||
// Test that all the planning logic is wired up and that we
|
||||
// can make a plan using a SchemaPivot node
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
|
@ -498,8 +485,6 @@ mod tests {
|
|||
let results = executor.to_string_set(plan).await.expect("Executed plan");
|
||||
|
||||
assert_eq!(results, to_set(&["f1", "f2"]));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// return a set for testing
|
||||
|
|
|
@ -301,7 +301,7 @@ mod tests {
|
|||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_pivot_exec_all_null() -> Result<()> {
|
||||
async fn schema_pivot_exec_all_null() {
|
||||
let case = SchemaTestCase {
|
||||
input_batches: &[TestBatch {
|
||||
a: &[None, None],
|
||||
|
@ -315,12 +315,10 @@ mod tests {
|
|||
"TestCase: {:?}",
|
||||
case
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_pivot_exec_both_non_null() -> Result<()> {
|
||||
async fn schema_pivot_exec_both_non_null() {
|
||||
let case = SchemaTestCase {
|
||||
input_batches: &[TestBatch {
|
||||
a: &[Some(1), None],
|
||||
|
@ -334,12 +332,10 @@ mod tests {
|
|||
"TestCase: {:?}",
|
||||
case
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_pivot_exec_one_non_null() -> Result<()> {
|
||||
async fn schema_pivot_exec_one_non_null() {
|
||||
let case = SchemaTestCase {
|
||||
input_batches: &[TestBatch {
|
||||
a: &[Some(1), None],
|
||||
|
@ -353,12 +349,10 @@ mod tests {
|
|||
"TestCase: {:?}",
|
||||
case
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_pivot_exec_both_non_null_two_record_batches() -> Result<()> {
|
||||
async fn schema_pivot_exec_both_non_null_two_record_batches() {
|
||||
let case = SchemaTestCase {
|
||||
input_batches: &[
|
||||
TestBatch {
|
||||
|
@ -378,12 +372,10 @@ mod tests {
|
|||
"TestCase: {:?}",
|
||||
case
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_pivot_exec_one_non_null_in_second_record_batch() -> Result<()> {
|
||||
async fn schema_pivot_exec_one_non_null_in_second_record_batch() {
|
||||
let case = SchemaTestCase {
|
||||
input_batches: &[
|
||||
TestBatch {
|
||||
|
@ -403,12 +395,10 @@ mod tests {
|
|||
"TestCase: {:?}",
|
||||
case
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schema_pivot_exec_bad_partition() -> Result<()> {
|
||||
async fn schema_pivot_exec_bad_partition() {
|
||||
// ensure passing in a bad partition generates a reasonable error
|
||||
|
||||
let pivot = make_schema_pivot(SchemaTestCase::input_schema(), vec![]);
|
||||
|
@ -427,8 +417,6 @@ mod tests {
|
|||
expected_error,
|
||||
actual_error
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return a StringSet extracted from the record batch
|
||||
|
|
|
@ -402,7 +402,7 @@ mod tests {
|
|||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_empty() -> Result<()> {
|
||||
async fn test_convert_empty() {
|
||||
let schema = Arc::new(Schema::new(vec![]));
|
||||
let empty_iterator = Box::pin(SizedRecordBatchStream::new(schema, vec![]));
|
||||
|
||||
|
@ -412,12 +412,10 @@ mod tests {
|
|||
|
||||
let results = convert(table_name, &tag_columns, &field_columns, empty_iterator).await;
|
||||
assert_eq!(results.len(), 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_single_series_no_tags() -> Result<()> {
|
||||
async fn test_convert_single_series_no_tags() {
|
||||
// single series
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
|
@ -467,11 +465,10 @@ mod tests {
|
|||
.collect::<Vec<String>>();
|
||||
|
||||
assert_eq!(expected_data, actual_data);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_single_series_no_tags_nulls() -> Result<()> {
|
||||
async fn test_convert_single_series_no_tags_nulls() {
|
||||
// single series
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
|
@ -522,11 +519,10 @@ mod tests {
|
|||
.collect::<Vec<String>>();
|
||||
|
||||
assert_eq!(expected_data, actual_data);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_single_series_one_tag() -> Result<()> {
|
||||
async fn test_convert_single_series_one_tag() {
|
||||
// single series
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
|
@ -558,12 +554,10 @@ mod tests {
|
|||
);
|
||||
assert_eq!(series_set.start_row, 0);
|
||||
assert_eq!(series_set.num_rows, 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_one_tag_multi_series() -> Result<()> {
|
||||
async fn test_convert_one_tag_multi_series() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
Field::new("tag_b", DataType::Utf8, true),
|
||||
|
@ -608,13 +602,11 @@ mod tests {
|
|||
);
|
||||
assert_eq!(series_set2.start_row, 3);
|
||||
assert_eq!(series_set2.num_rows, 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// two tag columns, three series
|
||||
#[tokio::test]
|
||||
async fn test_convert_two_tag_multi_series() -> Result<()> {
|
||||
async fn test_convert_two_tag_multi_series() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
Field::new("tag_b", DataType::Utf8, true),
|
||||
|
@ -667,12 +659,10 @@ mod tests {
|
|||
);
|
||||
assert_eq!(series_set3.start_row, 3);
|
||||
assert_eq!(series_set3.num_rows, 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_groups() -> Result<()> {
|
||||
async fn test_convert_groups() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
Field::new("tag_b", DataType::Utf8, true),
|
||||
|
@ -742,13 +732,11 @@ mod tests {
|
|||
);
|
||||
assert_eq!(series_set3.start_row, 2);
|
||||
assert_eq!(series_set3.num_rows, 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// test with no group tags specified
|
||||
#[tokio::test]
|
||||
async fn test_convert_groups_no_tags() -> Result<()> {
|
||||
async fn test_convert_groups_no_tags() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
Field::new("tag_b", DataType::Utf8, true),
|
||||
|
@ -795,8 +783,6 @@ mod tests {
|
|||
);
|
||||
assert_eq!(series_set2.start_row, 1);
|
||||
assert_eq!(series_set2.num_rows, 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn extract_group(item: &SeriesSetItem) -> &GroupDescription {
|
||||
|
|
|
@ -649,11 +649,8 @@ mod tests {
|
|||
|
||||
use super::*;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T = (), E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
#[tokio::test]
|
||||
async fn server_api_calls_return_error_with_no_id_set() -> Result {
|
||||
async fn server_api_calls_return_error_with_no_id_set() {
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Server::new(manager, store);
|
||||
|
@ -665,8 +662,6 @@ mod tests {
|
|||
let lines = parsed_lines("cpu foo=1 10");
|
||||
let resp = server.write_lines("foo", &lines).await.unwrap_err();
|
||||
assert!(matches!(resp, Error::IdNotSet));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -731,7 +726,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn duplicate_database_name_rejected() -> Result {
|
||||
async fn duplicate_database_name_rejected() {
|
||||
// Covers #643
|
||||
|
||||
let manager = TestConnectionManager::new();
|
||||
|
@ -756,12 +751,10 @@ mod tests {
|
|||
if !matches!(got, Error::DatabaseAlreadyExists {..}) {
|
||||
panic!("expected already exists error");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn db_names_sorted() -> Result {
|
||||
async fn db_names_sorted() {
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Server::new(manager, store);
|
||||
|
@ -779,19 +772,20 @@ mod tests {
|
|||
|
||||
let db_names_sorted = server.db_names_sorted();
|
||||
assert_eq!(names, db_names_sorted);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn writes_local() -> Result {
|
||||
async fn writes_local() {
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Server::new(manager, store);
|
||||
server.set_id(NonZeroU32::new(1).unwrap()).unwrap();
|
||||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(DatabaseName::new("foo").unwrap()))
|
||||
.await?;
|
||||
.create_database(DatabaseRules::new(name))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let line = "cpu bar=1 10";
|
||||
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
|
||||
|
@ -816,12 +810,10 @@ mod tests {
|
|||
"+-----+------+",
|
||||
];
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_chunk() -> Result {
|
||||
async fn close_chunk() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
|
@ -835,7 +827,8 @@ mod tests {
|
|||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(db_name.clone()))
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let line = "cpu bar=1 10";
|
||||
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
|
||||
|
@ -880,8 +873,6 @@ mod tests {
|
|||
// ensure that we don't leave the server instance hanging around
|
||||
cancel_token.cancel();
|
||||
let _ = background_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -938,7 +929,7 @@ partition_key:
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn background_task_cleans_jobs() -> Result {
|
||||
async fn background_task_cleans_jobs() {
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Arc::new(Server::new(manager, store));
|
||||
|
@ -957,8 +948,6 @@ partition_key:
|
|||
// ensure that we don't leave the server instance hanging around
|
||||
cancel_token.cancel();
|
||||
let _ = background_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Snafu, Debug, Clone)]
|
||||
|
|
|
@ -304,18 +304,18 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_socketaddr() -> Result<(), clap::Error> {
|
||||
let c = Config::from_iter_safe(
|
||||
to_vec(&["server", "--api-bind", "127.0.0.1:1234"]).into_iter(),
|
||||
)?;
|
||||
fn test_socketaddr() {
|
||||
let c =
|
||||
Config::from_iter_safe(to_vec(&["server", "--api-bind", "127.0.0.1:1234"]).into_iter())
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
c.http_bind_address,
|
||||
SocketAddr::from(([127, 0, 0, 1], 1234))
|
||||
);
|
||||
|
||||
let c = Config::from_iter_safe(
|
||||
to_vec(&["server", "--api-bind", "localhost:1234"]).into_iter(),
|
||||
)?;
|
||||
let c =
|
||||
Config::from_iter_safe(to_vec(&["server", "--api-bind", "localhost:1234"]).into_iter())
|
||||
.unwrap();
|
||||
// depending on where the test runs, localhost will either resolve to a ipv4 or
|
||||
// an ipv6 addr.
|
||||
match c.http_bind_address {
|
||||
|
@ -336,7 +336,5 @@ mod tests {
|
|||
.expect_err("must fail"),
|
||||
clap::ErrorKind::ValueValidation
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -729,11 +729,8 @@ mod tests {
|
|||
use server::{db::Db, ConnectionManagerImpl};
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health() -> Result<()> {
|
||||
async fn test_health() {
|
||||
let test_storage = Arc::new(AppServer::new(
|
||||
ConnectionManagerImpl {},
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
|
@ -745,11 +742,10 @@ mod tests {
|
|||
|
||||
// Print the response so if the test fails, we have a log of what went wrong
|
||||
check_response("health", response, StatusCode::OK, "OK").await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write() -> Result<()> {
|
||||
async fn test_write() {
|
||||
let test_storage = Arc::new(AppServer::new(
|
||||
ConnectionManagerImpl {},
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
|
@ -795,8 +791,6 @@ mod tests {
|
|||
"+----------------+--------------+-------+-----------------+------------+",
|
||||
];
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sets up a test database with some data for testing the query endpoint
|
||||
|
@ -837,7 +831,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_pretty() -> Result<()> {
|
||||
async fn test_query_pretty() {
|
||||
let (client, server_url) = setup_test_data().await;
|
||||
|
||||
// send query data
|
||||
|
@ -869,12 +863,10 @@ mod tests {
|
|||
assert_eq!(get_content_type(&response), "text/plain");
|
||||
|
||||
check_response("query", response, StatusCode::OK, res).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_csv() -> Result<()> {
|
||||
async fn test_query_csv() {
|
||||
let (client, server_url) = setup_test_data().await;
|
||||
|
||||
// send query data
|
||||
|
@ -891,12 +883,10 @@ mod tests {
|
|||
let res = "bottom_degrees,location,state,surface_degrees,time\n\
|
||||
50.4,santa_monica,CA,65.2,1568756160\n";
|
||||
check_response("query", response, StatusCode::OK, res).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_json() -> Result<()> {
|
||||
async fn test_query_json() {
|
||||
let (client, server_url) = setup_test_data().await;
|
||||
|
||||
// send a second line of data to demontrate how that works
|
||||
|
@ -930,8 +920,6 @@ mod tests {
|
|||
// Note two json records: one record on each line
|
||||
let res = r#"[{"bottom_degrees":50.4,"location":"santa_monica","state":"CA","surface_degrees":65.2,"time":1568756160},{"location":"Boston","state":"MA","surface_degrees":50.2,"time":1568756160}]"#;
|
||||
check_response("query", response, StatusCode::OK, res).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn gzip_str(s: &str) -> Vec<u8> {
|
||||
|
@ -943,7 +931,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gzip_write() -> Result<()> {
|
||||
async fn test_gzip_write() {
|
||||
let test_storage = Arc::new(AppServer::new(
|
||||
ConnectionManagerImpl {},
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
|
@ -990,8 +978,6 @@ mod tests {
|
|||
"+----------------+--------------+-------+-----------------+------------+",
|
||||
];
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -1091,7 +1091,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_capabilities() -> Result<(), tonic::Status> {
|
||||
async fn test_storage_rpc_capabilities() {
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
|
@ -1106,10 +1106,8 @@ mod tests {
|
|||
|
||||
assert_eq!(
|
||||
expected_capabilities,
|
||||
fixture.storage_client.capabilities().await?
|
||||
fixture.storage_client.capabilities().await.unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1196,7 +1194,7 @@ mod tests {
|
|||
/// the right parameters are passed into the Database interface
|
||||
/// and that the returned values are sent back via gRPC.
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_tag_keys() -> Result<(), tonic::Status> {
|
||||
async fn test_storage_rpc_tag_keys() {
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
|
@ -1229,7 +1227,7 @@ mod tests {
|
|||
predicate: make_state_ma_predicate(),
|
||||
};
|
||||
|
||||
let actual_tag_keys = fixture.storage_client.tag_keys(request).await?;
|
||||
let actual_tag_keys = fixture.storage_client.tag_keys(request).await.unwrap();
|
||||
let expected_tag_keys = vec!["_f(0xff)", "_m(0x00)", "k1", "k2", "k3", "k4"];
|
||||
|
||||
assert_eq!(actual_tag_keys, expected_tag_keys,);
|
||||
|
@ -1256,11 +1254,10 @@ mod tests {
|
|||
"\nActual: {:?}\nExpected: {:?}",
|
||||
actual_predicate, expected_predicate
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_tag_keys_error() -> Result<(), tonic::Status> {
|
||||
async fn test_storage_rpc_tag_keys_error() {
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
|
@ -1293,15 +1290,13 @@ mod tests {
|
|||
|
||||
let response = fixture.storage_client.tag_keys(request).await;
|
||||
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for measurement_tag_keys--
|
||||
/// specifically that the right parameters are passed into the Database
|
||||
/// interface and that the returned values are sent back via gRPC.
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_measurement_tag_keys() -> Result<(), tonic::Status> {
|
||||
async fn test_storage_rpc_measurement_tag_keys() {
|
||||
test_helpers::maybe_start_logging();
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
@ -1340,7 +1335,11 @@ mod tests {
|
|||
predicate: make_state_ma_predicate(),
|
||||
};
|
||||
|
||||
let actual_tag_keys = fixture.storage_client.measurement_tag_keys(request).await?;
|
||||
let actual_tag_keys = fixture
|
||||
.storage_client
|
||||
.measurement_tag_keys(request)
|
||||
.await
|
||||
.unwrap();
|
||||
let expected_tag_keys = vec!["_f(0xff)", "_m(0x00)", "k1", "k2", "k3", "k4"];
|
||||
|
||||
assert_eq!(
|
||||
|
@ -1371,11 +1370,10 @@ mod tests {
|
|||
"\nActual: {:?}\nExpected: {:?}",
|
||||
actual_predicate, expected_predicate
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_measurement_tag_keys_error() -> Result<(), tonic::Status> {
|
||||
async fn test_storage_rpc_measurement_tag_keys_error() {
|
||||
test_helpers::maybe_start_logging();
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
@ -1412,8 +1410,6 @@ mod tests {
|
|||
|
||||
let response = fixture.storage_client.measurement_tag_keys(request).await;
|
||||
assert_contains!(response.unwrap_err().to_string(), "This is an error");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for tag_keys -- specifically that
|
||||
|
@ -1710,7 +1706,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_log_on_panic() -> Result<(), tonic::Status> {
|
||||
async fn test_log_on_panic() {
|
||||
// Send a message to a route that causes a panic and ensure:
|
||||
// 1. We don't use up all executors 2. The panic message
|
||||
// message ends up in the log system
|
||||
|
@ -1759,10 +1755,8 @@ mod tests {
|
|||
}
|
||||
|
||||
// Ensure there are still threads to answer actual client queries
|
||||
let caps = fixture.storage_client.capabilities().await?;
|
||||
let caps = fixture.storage_client.capabilities().await.unwrap();
|
||||
assert!(!caps.is_empty(), "Caps: {:?}", caps);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -686,47 +686,42 @@ impl WritePayload {
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T = (), E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
#[test]
|
||||
fn sequence_numbers_are_persisted() -> Result {
|
||||
let dir = test_helpers::tmp_dir()?;
|
||||
fn sequence_numbers_are_persisted() {
|
||||
let dir = test_helpers::tmp_dir().unwrap();
|
||||
let builder = WalBuilder::new(dir.as_ref());
|
||||
let mut wal;
|
||||
|
||||
// Create one in-memory WAL and sync it
|
||||
{
|
||||
wal = builder.clone().wal()?;
|
||||
wal = builder.clone().wal().unwrap();
|
||||
|
||||
let data = Vec::from("somedata");
|
||||
let data = WritePayload::new(data)?;
|
||||
let seq = wal.append(data)?;
|
||||
let data = WritePayload::new(data).unwrap();
|
||||
let seq = wal.append(data).unwrap();
|
||||
assert_eq!(0, seq);
|
||||
wal.sync_all()?;
|
||||
wal.sync_all().unwrap();
|
||||
}
|
||||
|
||||
// Pretend the process restarts
|
||||
{
|
||||
wal = builder.wal()?;
|
||||
wal = builder.wal().unwrap();
|
||||
|
||||
assert_eq!(1, wal.sequence_number);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sequence_numbers_increase_by_number_of_pending_entries() -> Result {
|
||||
let dir = test_helpers::tmp_dir()?;
|
||||
fn sequence_numbers_increase_by_number_of_pending_entries() {
|
||||
let dir = test_helpers::tmp_dir().unwrap();
|
||||
let builder = WalBuilder::new(dir.as_ref());
|
||||
let mut wal = builder.wal()?;
|
||||
let mut wal = builder.wal().unwrap();
|
||||
|
||||
// Write 1 entry then sync
|
||||
let data = Vec::from("some");
|
||||
let data = WritePayload::new(data)?;
|
||||
let seq = wal.append(data)?;
|
||||
wal.sync_all()?;
|
||||
let data = WritePayload::new(data).unwrap();
|
||||
let seq = wal.append(data).unwrap();
|
||||
wal.sync_all().unwrap();
|
||||
assert_eq!(0, seq);
|
||||
|
||||
// Sequence number should increase by 1
|
||||
|
@ -734,19 +729,17 @@ mod tests {
|
|||
|
||||
// Write 2 entries then sync
|
||||
let data = Vec::from("other");
|
||||
let data = WritePayload::new(data)?;
|
||||
let seq = wal.append(data)?;
|
||||
let data = WritePayload::new(data).unwrap();
|
||||
let seq = wal.append(data).unwrap();
|
||||
assert_eq!(1, seq);
|
||||
|
||||
let data = Vec::from("again");
|
||||
let data = WritePayload::new(data)?;
|
||||
let seq = wal.append(data)?;
|
||||
let data = WritePayload::new(data).unwrap();
|
||||
let seq = wal.append(data).unwrap();
|
||||
assert_eq!(2, seq);
|
||||
wal.sync_all()?;
|
||||
wal.sync_all().unwrap();
|
||||
|
||||
// Sequence number should increase by 2
|
||||
assert_eq!(3, wal.sequence_number);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,26 +1,21 @@
|
|||
use wal::{WalBuilder, WritePayload};
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T = (), E = TestError> = std::result::Result<T, E>;
|
||||
|
||||
#[test]
|
||||
fn no_concurrency() -> Result {
|
||||
let dir = test_helpers::tmp_dir()?;
|
||||
fn no_concurrency() {
|
||||
let dir = test_helpers::tmp_dir().unwrap();
|
||||
let builder = WalBuilder::new(dir.as_ref());
|
||||
let mut wal = builder.clone().wal()?;
|
||||
let mut wal = builder.clone().wal().unwrap();
|
||||
|
||||
let data = Vec::from("somedata");
|
||||
let payload = WritePayload::new(data)?;
|
||||
let sequence_number = wal.append(payload)?;
|
||||
wal.sync_all()?;
|
||||
let payload = WritePayload::new(data).unwrap();
|
||||
let sequence_number = wal.append(payload).unwrap();
|
||||
wal.sync_all().unwrap();
|
||||
|
||||
assert_eq!(0, sequence_number);
|
||||
|
||||
let wal_entries: Result<Vec<_>, _> = builder.entries()?.collect();
|
||||
let wal_entries = wal_entries?;
|
||||
let wal_entries: Result<Vec<_>, _> = builder.entries().unwrap().collect();
|
||||
let wal_entries = wal_entries.unwrap();
|
||||
assert_eq!(1, wal_entries.len());
|
||||
assert_eq!(b"somedata".as_ref(), wal_entries[0].as_data());
|
||||
assert_eq!(0, wal_entries[0].sequence_number());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue