From ff8e0379713a0ac93679976ecd9bd02c36b31ba0 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 4 Oct 2021 20:37:53 -0400 Subject: [PATCH 1/8] feat: handle delete response --- .../protos/influxdata/iox/management/v1/service.proto | 1 - influxdb_iox_client/src/client/management.rs | 1 - predicate/src/delete_predicate.rs | 1 - src/influxdb_ioxd/rpc/error.rs | 6 ++++++ src/influxdb_ioxd/rpc/management.rs | 4 ++-- tests/end_to_end_cases/management_api.rs | 6 ++++-- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 5793bcd6e0..84fb5bed3e 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -494,5 +494,4 @@ message DeleteRequest { } message DeleteResponse { - // NGA todo: define an appropriate response } diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index b47e9e2c37..67cc361c5a 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -1057,7 +1057,6 @@ impl Client { _ => DeleteError::ServerError(status), })?; - // NGA todo: return a handle to the delete? Ok(()) } diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index 62224f4097..e8da83dd39 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -705,7 +705,6 @@ mod tests { assert_eq!(expected, result); } - // NGA todo: check content of error messages #[test] fn test_parse_delete_negative() { // invalid key diff --git a/src/influxdb_ioxd/rpc/error.rs b/src/influxdb_ioxd/rpc/error.rs index 442c2885fb..4fb78f0e6b 100644 --- a/src/influxdb_ioxd/rpc/error.rs +++ b/src/influxdb_ioxd/rpc/error.rs @@ -177,6 +177,12 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status { ), } .into(), + Error::DeleteFromTable { source, table_name } => PreconditionViolation { + category: "database".to_string(), + subject: "influxdata.com/iox".to_string(), + description: format!("Cannot delete data from table: {} : {}", table_name, source), + } + .into(), Error::CatalogError { source } => default_catalog_error_handler(source), error => { error!(?error, "Unexpected error"); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 6a5fa707dd..a60b791286 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -623,6 +623,7 @@ where .db(&db_name) .map_err(default_server_error_handler)?; + // Parse input strings and build delete predicate let del_predicate_result = ParseDeletePredicate::build_delete_predicate( start_time.clone(), stop_time.clone(), @@ -637,14 +638,13 @@ where })) } Ok(del_predicate) => { - //execute delete + // execute delete db.delete(&table_name, Arc::new(del_predicate)) .await .map_err(default_db_error_handler)?; } } - // NGA todo: return a delete handle with the response? Ok(Response::new(DeleteResponse {})) } } diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 2f31a2f62d..eaeb6ff923 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1587,8 +1587,10 @@ async fn test_delete() { let pred = "region = west"; let del = management_client .delete(db_name.clone(), table, start, stop, pred) - .await; - assert!(del.is_err()); + .await + .unwrap_err() + .to_string(); + assert!(del.contains("Cannot delete data from table")); // Verify both existing tables still have the same data // query to verify data deleted From 80c4ba959dff4533e8f32bde6853fb41121accea Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 4 Oct 2021 21:19:10 -0400 Subject: [PATCH 2/8] test: add negative tests for delete hhtp endpoints --- src/influxdb_ioxd/http.rs | 42 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index d56fdc463e..c0d532e60d 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -282,7 +282,7 @@ impl ApplicationError { Self::ParsingLineProtocol { .. } => self.bad_request(), Self::ParsingDelete { .. } => self.bad_request(), Self::BuildingDeletePredicate { .. } => self.bad_request(), - Self::ExecutingDelete { .. } => self.internal_error(), + Self::ExecutingDelete { .. } => self.bad_request(), Self::ReadingBodyAsGzip { .. } => self.bad_request(), Self::ClientHangup { .. } => self.bad_request(), Self::RouteNotFound { .. } => self.not_found(), @@ -1150,6 +1150,44 @@ mod tests { "+----------------+--------------+-------+-----------------+----------------------+", ]; assert_batches_eq!(expected, &batches); + + // ------------------- + // negative tests + // Not able to parse _measurement="not_a_table" (it must be _measurement=\"not_a_table\" to work) + let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement="not_a_table" and location=Boston"}"#; + let response = client + .post(&format!( + "{}/api/v2/delete?bucket={}&org={}", + server_url, bucket_name, org_name + )) + .body(delete_line) + .send() + .await; + check_response( + "delete", + response, + StatusCode::BAD_REQUEST, + Some("Unable to parse delete string"), + ) + .await; + + // delete from non-existing table + let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=not_a_table and location=Boston"}"#; + let response = client + .post(&format!( + "{}/api/v2/delete?bucket={}&org={}", + server_url, bucket_name, org_name + )) + .body(delete_line) + .send() + .await; + check_response( + "delete", + response, + StatusCode::BAD_REQUEST, + Some("Cannot delete data from non-existing table"), + ) + .await; } #[tokio::test] @@ -1607,7 +1645,7 @@ mod tests { assert_eq!(status, expected_status); if let Some(expected_body) = expected_body { - assert_eq!(body, expected_body); + assert!(body.contains(expected_body)); } } else { panic!("Unexpected error response: {:?}", response); From b8aa4c33ce979647986b0f13c388bafd7155dd06 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 5 Oct 2021 12:27:48 +0200 Subject: [PATCH 3/8] refactor: use protobuf bytes for transaction UUIDs --- generated_types/build.rs | 6 ++- .../influxdata/iox/catalog/v1/catalog.proto | 11 ++++- parquet_file/src/catalog/core.rs | 43 ++++++++++--------- parquet_file/src/catalog/dump.rs | 24 +++++------ .../src/catalog/internals/proto_parse.rs | 16 +++---- 5 files changed, 57 insertions(+), 43 deletions(-) diff --git a/generated_types/build.rs b/generated_types/build.rs index 29222d89a7..32562becc3 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -65,7 +65,11 @@ fn generate_grpc_types(root: &Path) -> Result<()> { .compile_well_known_types() .disable_comments(&[".google"]) .extern_path(".google.protobuf", "::pbjson_types") - .bytes(&[".influxdata.iox.catalog.v1.AddParquet.metadata"]) + .bytes(&[ + ".influxdata.iox.catalog.v1.AddParquet.metadata", + ".influxdata.iox.catalog.v1.Transaction.previous_uuid", + ".influxdata.iox.catalog.v1.Transaction.uuid", + ]) .btree_map(&[ ".influxdata.iox.catalog.v1.DatabaseCheckpoint.sequencer_numbers", ".influxdata.iox.catalog.v1.PartitionCheckpoint.sequencer_numbers", diff --git a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto index 8b6bbc62c0..4543313078 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto @@ -93,12 +93,19 @@ message Transaction { // Revision counter, must by "previous revision" + 1 or 0 for the first transaction. uint64 revision_counter = 3; + // Was string-formatted UUID and previous UUID. + reserved 4, 5; + // UUID unique to this transaction. Used to detect concurrent transactions. For the first transaction this field is // empty. - string uuid = 4; + // + // UUID is stored as 16 bytes in big-endian order. + bytes uuid = 8; // UUID of last commit. - string previous_uuid = 5; + // + // UUID is stored as 16 bytes in big-endian order. + bytes previous_uuid = 9; // Start timestamp. // diff --git a/parquet_file/src/catalog/core.rs b/parquet_file/src/catalog/core.rs index 4e1bd71eaa..eb78e62108 100644 --- a/parquet_file/src/catalog/core.rs +++ b/parquet_file/src/catalog/core.rs @@ -39,7 +39,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError; /// Current version for serialized transactions. /// /// For breaking changes, this will change. -pub const TRANSACTION_VERSION: u32 = 16; +pub const TRANSACTION_VERSION: u32 = 17; #[derive(Debug, Snafu)] pub enum Error { @@ -492,15 +492,18 @@ impl OpenTransaction { start_timestamp: DateTime, ) -> Self { let (revision_counter, previous_uuid) = match previous_tkey { - Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()), - None => (0, String::new()), + Some(tkey) => ( + tkey.revision_counter + 1, + tkey.uuid.as_bytes().to_vec().into(), + ), + None => (0, Bytes::new()), }; Self { proto: proto::Transaction { actions: vec![], version: TRANSACTION_VERSION, - uuid: uuid.to_string(), + uuid: uuid.as_bytes().to_vec().into(), revision_counter, previous_uuid, start_timestamp: Some(start_timestamp.into()), @@ -512,7 +515,7 @@ impl OpenTransaction { fn tkey(&self) -> TransactionKey { TransactionKey { revision_counter: self.proto.revision_counter, - uuid: Uuid::parse_str(&self.proto.uuid).expect("UUID was checked before"), + uuid: Uuid::from_slice(&self.proto.uuid).expect("UUID was checked before"), } } @@ -935,11 +938,11 @@ impl<'c> CheckpointHandle<'c> { let proto = proto::Transaction { actions, version: TRANSACTION_VERSION, - uuid: self.tkey.uuid.to_string(), + uuid: self.tkey.uuid.as_bytes().to_vec().into(), revision_counter: self.tkey.revision_counter, previous_uuid: self .previous_tkey - .map_or_else(String::new, |tkey| tkey.uuid.to_string()), + .map_or_else(Bytes::new, |tkey| tkey.uuid.as_bytes().to_vec().into()), start_timestamp: Some(Utc::now().into()), encoding: proto::transaction::Encoding::Full.into(), }; @@ -1161,9 +1164,9 @@ mod tests { let mut proto = load_transaction_proto(&iox_object_store, &path) .await .unwrap(); - let uuid_expected = Uuid::parse_str(&proto.uuid).unwrap(); + let uuid_expected = Uuid::from_slice(&proto.uuid).unwrap(); let uuid_actual = Uuid::nil(); - proto.uuid = uuid_actual.to_string(); + proto.uuid = uuid_actual.as_bytes().to_vec().into(); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1192,7 +1195,7 @@ mod tests { let mut proto = load_transaction_proto(&iox_object_store, &path) .await .unwrap(); - proto.uuid = String::new(); + proto.uuid = Bytes::new(); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1218,7 +1221,7 @@ mod tests { let mut proto = load_transaction_proto(&iox_object_store, &path) .await .unwrap(); - proto.uuid = "foo".to_string(); + proto.uuid = Bytes::from("foo"); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1228,7 +1231,7 @@ mod tests { PreservedCatalog::load::(Arc::clone(&iox_object_store), ()).await; assert_eq!( res.unwrap_err().to_string(), - "Internal: Error while parsing protobuf: Cannot parse UUID: invalid length: expected one of [36, 32], found 3" + "Internal: Error while parsing protobuf: Cannot parse UUID: invalid bytes length: expected 16, found 3" ); } @@ -1244,7 +1247,7 @@ mod tests { let mut proto = load_transaction_proto(&iox_object_store, &path) .await .unwrap(); - proto.previous_uuid = Uuid::nil().to_string(); + proto.previous_uuid = Uuid::nil().as_bytes().to_vec().into(); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1267,7 +1270,7 @@ mod tests { let mut proto = load_transaction_proto(&iox_object_store, &path) .await .unwrap(); - proto.previous_uuid = Uuid::nil().to_string(); + proto.previous_uuid = Uuid::nil().as_bytes().to_vec().into(); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1290,7 +1293,7 @@ mod tests { let mut proto = load_transaction_proto(&iox_object_store, &path) .await .unwrap(); - proto.previous_uuid = "foo".to_string(); + proto.previous_uuid = Bytes::from("foo"); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1300,7 +1303,7 @@ mod tests { PreservedCatalog::load::(Arc::clone(&iox_object_store), ()).await; assert_eq!( res.unwrap_err().to_string(), - "Internal: Error while parsing protobuf: Cannot parse UUID: invalid length: expected one of [36, 32], found 3" + "Internal: Error while parsing protobuf: Cannot parse UUID: invalid bytes length: expected 16, found 3" ); } @@ -1339,7 +1342,7 @@ mod tests { let mut t = catalog.open_transaction().await; // open transaction - t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); + t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().as_bytes().to_vec().into(); assert_eq!( format!("{:?}", t), "TransactionHandle(open, 1.00000000-0000-0000-0000-000000000000)" @@ -1367,7 +1370,7 @@ mod tests { let new_uuid = Uuid::new_v4(); tkey.uuid = new_uuid; let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid); - proto.uuid = new_uuid.to_string(); + proto.uuid = new_uuid.as_bytes().to_vec().into(); store_transaction_proto(&iox_object_store, &path, &proto) .await .unwrap(); @@ -1400,7 +1403,7 @@ mod tests { let new_uuid = Uuid::new_v4(); tkey.uuid = new_uuid; let path = TransactionFilePath::new_checkpoint(tkey.revision_counter, tkey.uuid); - proto.uuid = new_uuid.to_string(); + proto.uuid = new_uuid.as_bytes().to_vec().into(); proto.encoding = proto::transaction::Encoding::Full.into(); store_transaction_proto(&iox_object_store, &path, &proto) .await @@ -2075,7 +2078,7 @@ mod tests { .unwrap(); let mut t = catalog.open_transaction().await; - t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); + t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().as_bytes().to_vec().into(); assert_eq!(t.uuid(), Uuid::nil()); } diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index d405a55467..a93bef4b77 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -272,11 +272,11 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 16, + version: 17, actions: [], revision_counter: 0, - uuid: "00000000-0000-0000-0000-000000000000", - previous_uuid: "", + uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", + previous_uuid: b"", start_timestamp: Some( Timestamp { seconds: 10, @@ -297,7 +297,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 16, + version: 17, actions: [ Action { action: Some( @@ -320,8 +320,8 @@ File { }, ], revision_counter: 1, - uuid: "00000000-0000-0000-0000-000000000000", - previous_uuid: "00000000-0000-0000-0000-000000000000", + uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", + previous_uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", start_timestamp: Some( Timestamp { seconds: 10, @@ -396,11 +396,11 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 16, + version: 17, actions: [], revision_counter: 0, - uuid: "00000000-0000-0000-0000-000000000000", - previous_uuid: "", + uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", + previous_uuid: b"", start_timestamp: Some( Timestamp { seconds: 10, @@ -421,7 +421,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 16, + version: 17, actions: [ Action { action: Some( @@ -444,8 +444,8 @@ File { }, ], revision_counter: 1, - uuid: "00000000-0000-0000-0000-000000000000", - previous_uuid: "00000000-0000-0000-0000-000000000000", + uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", + previous_uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", start_timestamp: Some( Timestamp { seconds: 10, diff --git a/parquet_file/src/catalog/internals/proto_parse.rs b/parquet_file/src/catalog/internals/proto_parse.rs index 13928931bf..5a7cfa38bb 100644 --- a/parquet_file/src/catalog/internals/proto_parse.rs +++ b/parquet_file/src/catalog/internals/proto_parse.rs @@ -1,4 +1,4 @@ -use std::{convert::TryInto, num::TryFromIntError, str::FromStr}; +use std::{convert::TryInto, num::TryFromIntError}; use chrono::{DateTime, Utc}; use generated_types::influxdata::iox::catalog::v1 as proto; @@ -33,19 +33,19 @@ pub enum Error { pub type Result = std::result::Result; -/// Parse UUID from protobuf. -pub fn parse_uuid(s: &str) -> Result> { - if s.is_empty() { +/// Parse big-endian UUID from protobuf. +pub fn parse_uuid(bytes: &[u8]) -> Result> { + if bytes.is_empty() { Ok(None) } else { - let uuid = Uuid::from_str(s).context(UuidParse {})?; + let uuid = Uuid::from_slice(bytes).context(UuidParse {})?; Ok(Some(uuid)) } } -/// Parse UUID from protobuf and fail if protobuf did not provide data. -pub fn parse_uuid_required(s: &str) -> Result { - parse_uuid(s)?.context(UuidRequired {}) +/// Parse big-endian UUID from protobuf and fail if protobuf did not provide data. +pub fn parse_uuid_required(bytes: &[u8]) -> Result { + parse_uuid(bytes)?.context(UuidRequired {}) } /// Parse [`ParquetFilePath`](iox_object_store::ParquetFilePath) from protobuf. From 53e49156d6da3d85cd12e64d5477dc5b6207bd5e Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 5 Oct 2021 15:42:44 +0200 Subject: [PATCH 4/8] refactor: derive Clone for IOx client builder Allows an IOx client builder (with its associated configuration) to be cloned. --- client_util/src/connection.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/client_util/src/connection.rs b/client_util/src/connection.rs index 1729ee5e7f..15a91741f9 100644 --- a/client_util/src/connection.rs +++ b/client_util/src/connection.rs @@ -68,7 +68,7 @@ pub type Result = std::result::Result; /// .expect("connection must succeed"); /// # } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Builder { user_agent: String, headers: Vec<(HeaderName, HeaderValue)>, @@ -171,3 +171,15 @@ impl Builder { Self { timeout, ..self } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_builder_cloneable() { + // Clone is used by Conductor. + fn assert_clone(_t: T) {} + assert_clone(Builder::default()) + } +} From 5abc55406dde7e60cda125dd7035c4ce9cd80042 Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 5 Oct 2021 16:27:00 +0200 Subject: [PATCH 5/8] build: pin smallvec Pin the smallvec version until the rest of the ecosystem catches up. Fixes https://github.com/influxdata/influxdb_iox/issues/2735. --- .github/dependabot.yml | 2 ++ influxdb_line_protocol/Cargo.toml | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b8214a7edd..b4b2c489be 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -13,3 +13,5 @@ updates: # Additionally the thrift-compiler version available in standard repos tends to lag # the latest release significantly, and so updating to the latest version adds friction - dependency-name: "thrift" + # https://github.com/influxdata/influxdb_iox/issues/2735 + - dependency-name: "smallvec" diff --git a/influxdb_line_protocol/Cargo.toml b/influxdb_line_protocol/Cargo.toml index b2a194c86b..a7aa8f38a2 100644 --- a/influxdb_line_protocol/Cargo.toml +++ b/influxdb_line_protocol/Cargo.toml @@ -6,9 +6,10 @@ edition = "2018" [dependencies] # In alphabetical order nom = "7" -smallvec = "1.7.0" +# See https://github.com/influxdata/influxdb_iox/issues/2735 before bumping smallvec +smallvec = "1.6.1" snafu = "0.6.2" observability_deps = { path = "../observability_deps" } [dev-dependencies] # In alphabetical order -test_helpers = { path = "../test_helpers" } \ No newline at end of file +test_helpers = { path = "../test_helpers" } From bf73bba1dc7607dc003972f8e03a62cbfcce185c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Oct 2021 10:56:41 -0400 Subject: [PATCH 6/8] fix: Return `_field` and `_measurement` tags correctly for read_group (#2736) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- src/influxdb_ioxd/rpc/storage/data.rs | 19 ++++++++--- tests/end_to_end_cases/storage_api.rs | 48 +++++++++++---------------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/src/influxdb_ioxd/rpc/storage/data.rs b/src/influxdb_ioxd/rpc/storage/data.rs index 66ea50c68e..e585e298cf 100644 --- a/src/influxdb_ioxd/rpc/storage/data.rs +++ b/src/influxdb_ioxd/rpc/storage/data.rs @@ -112,6 +112,16 @@ fn group_description_to_frames(group_description: GroupDescription) -> Vec>(); + let group_frame = GroupFrame { tag_keys, partition_key_vals, @@ -232,8 +242,8 @@ fn field_to_data( Ok(()) } -// Convert the tag=value pairs from the series set to the correct gRPC -// format, and add the _f and _m tags for the field name and measurement +/// Convert the tag=value pairs from the series set to the correct gRPC +/// format, and add the _f and _m tags for the field name and measurement fn convert_tags(table_name: &str, field_name: &str, tags: &[(Arc, Arc)]) -> Vec { // Special case "measurement" name which is modeled as a tag of // "_measurement" and "field" which is modeled as a tag of "_field" @@ -548,8 +558,9 @@ mod tests { .map(|f| dump_frame(f)) .collect::>(); - let expected_frames = - vec!["GroupFrame, tag_keys: tag1,tag2, partition_key_vals: val1,val2"]; + let expected_frames = vec![ + "GroupFrame, tag_keys: _field,_measurement,tag1,tag2, partition_key_vals: val1,val2", + ]; assert_eq!( dumped_frames, expected_frames, diff --git a/tests/end_to_end_cases/storage_api.rs b/tests/end_to_end_cases/storage_api.rs index 91793dee12..9177391cf4 100644 --- a/tests/end_to_end_cases/storage_api.rs +++ b/tests/end_to_end_cases/storage_api.rs @@ -401,7 +401,7 @@ async fn test_read_group_none_agg( }; let expected_group_frames = vec![ - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu1", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0", "FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0", @@ -410,7 +410,7 @@ async fn test_read_group_none_agg( "FloatPointsFrame, timestamps: [1000, 2000], values: \"10,11\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0", "FloatPointsFrame, timestamps: [1000, 2000], values: \"71,72\"", - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu2", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0", "FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0", @@ -424,11 +424,9 @@ async fn test_read_group_none_agg( let actual_group_frames = do_read_group_request(storage_client, read_group_request).await; assert_eq!( - expected_group_frames, - actual_group_frames, - "Expected:\n{}\nActual:\n{}", - expected_group_frames.join("\n"), - actual_group_frames.join("\n") + expected_group_frames, actual_group_frames, + "Expected:\n{:#?}\nActual:\n{:#?}", + expected_group_frames, actual_group_frames ); } @@ -453,12 +451,12 @@ async fn test_read_group_none_agg_with_predicate( }; let expected_group_frames = vec![ - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu1", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0", "FloatPointsFrame, timestamps: [1000], values: \"20\"", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0", "FloatPointsFrame, timestamps: [1000], values: \"10\"", - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu2", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0", "FloatPointsFrame, timestamps: [1000], values: \"40\"", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0", @@ -468,11 +466,9 @@ async fn test_read_group_none_agg_with_predicate( let actual_group_frames = do_read_group_request(storage_client, read_group_request).await; assert_eq!( - expected_group_frames, - actual_group_frames, - "Expected:\n{}\nActual:\n{}", - expected_group_frames.join("\n"), - actual_group_frames.join("\n") + expected_group_frames, actual_group_frames, + "Expected:\n{:#?}\nActual:\n{:#?}", + expected_group_frames, actual_group_frames ); } @@ -500,7 +496,7 @@ async fn test_read_group_sum_agg( }; let expected_group_frames = vec![ - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu1", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0", "FloatPointsFrame, timestamps: [2000], values: \"41\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0", @@ -509,7 +505,7 @@ async fn test_read_group_sum_agg( "FloatPointsFrame, timestamps: [2000], values: \"21\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0", "FloatPointsFrame, timestamps: [2000], values: \"143\"", - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu2", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0", "FloatPointsFrame, timestamps: [2000], values: \"81\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0", @@ -523,11 +519,9 @@ async fn test_read_group_sum_agg( let actual_group_frames = do_read_group_request(storage_client, read_group_request).await; assert_eq!( - expected_group_frames, - actual_group_frames, - "Expected:\n{}\nActual:\n{}", - expected_group_frames.join("\n"), - actual_group_frames.join("\n") + expected_group_frames, actual_group_frames, + "Expected:\n{:#?}\nActual:\n{:#?}", + expected_group_frames, actual_group_frames, ); } @@ -555,7 +549,7 @@ async fn test_read_group_last_agg( }; let expected_group_frames = vec![ - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu1", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0", "FloatPointsFrame, timestamps: [2000], values: \"21\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0", @@ -564,7 +558,7 @@ async fn test_read_group_last_agg( "FloatPointsFrame, timestamps: [2000], values: \"11\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0", "FloatPointsFrame, timestamps: [2000], values: \"72\"", - "GroupFrame, tag_keys: cpu, partition_key_vals: cpu2", + "GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2", "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0", "FloatPointsFrame, timestamps: [2000], values: \"41\"", "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0", @@ -578,11 +572,9 @@ async fn test_read_group_last_agg( let actual_group_frames = do_read_group_request(storage_client, read_group_request).await; assert_eq!( - expected_group_frames, - actual_group_frames, - "Expected:\n{}\nActual:\n{}", - expected_group_frames.join("\n"), - actual_group_frames.join("\n") + expected_group_frames, actual_group_frames, + "Expected:\n{:#?}\nActual:\n{:#?}", + expected_group_frames, actual_group_frames, ); } From 2a584420b3dcb095ec21cfec46d7984694396b68 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 5 Oct 2021 16:07:45 +0100 Subject: [PATCH 7/8] refactor: make data_types optional dependency (#2739) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- generated_types/Cargo.toml | 6 +++++- generated_types/src/lib.rs | 14 ++++++++++---- server/Cargo.toml | 2 +- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 6606198b8b..5d51cbbfe4 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] # In alphabetical order bytes = "1.0" -data_types = { path = "../data_types" } +data_types = { path = "../data_types", optional = true } observability_deps = { path = "../observability_deps" } pbjson = "0.1" pbjson-types = "0.1" @@ -26,3 +26,7 @@ proc-macro2 = "=1.0.27" tonic-build = "0.5" prost-build = "0.8" pbjson-build = "0.1" + +[features] +default = [] +data_types_conversions = ["data_types"] \ No newline at end of file diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 84b38f56e9..cffa81ac87 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -140,11 +140,17 @@ pub fn protobuf_type_url_eq(url: &str, protobuf_type: &str) -> bool { pub use com::github::influxdata::idpe::storage::read::*; pub use influxdata::platform::storage::*; -pub mod chunk; -pub mod database_rules; -pub mod database_state; -pub mod detailed_database; pub mod google; + +#[cfg(feature = "data_types_conversions")] +pub mod chunk; +#[cfg(feature = "data_types_conversions")] +pub mod database_rules; +#[cfg(feature = "data_types_conversions")] +pub mod database_state; +#[cfg(feature = "data_types_conversions")] +pub mod detailed_database; +#[cfg(feature = "data_types_conversions")] pub mod job; #[cfg(test)] diff --git a/server/Cargo.toml b/server/Cargo.toml index adfc5826cc..777b0cf71e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -21,7 +21,7 @@ entry = { path = "../entry" } flatbuffers = "2" futures = "0.3" futures-util = { version = "0.3.1" } -generated_types = { path = "../generated_types" } +generated_types = { path = "../generated_types", features = ["data_types_conversions"] } hashbrown = "0.11" influxdb_iox_client = { path = "../influxdb_iox_client" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } From d0929e3a34a5907eea24d2f6438c98446e0ee6a0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 5 Oct 2021 16:18:35 +0100 Subject: [PATCH 8/8] feat: persist no chunks (#2712) (#2718) * feat: persist no chunks (#2712) * fix: persist partition * fix: chunk ordering test * chore: fix logical conflict Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- lifecycle/Cargo.toml | 1 + lifecycle/src/policy.rs | 39 +++++++--- .../src/persistence_windows.rs | 2 +- query_tests/src/scenarios.rs | 56 +++++++-------- server/src/db.rs | 27 +++---- server/src/db/lifecycle/persist.rs | 72 ++++++++++++++++--- tracker/src/task.rs | 5 ++ 7 files changed, 136 insertions(+), 66 deletions(-) diff --git a/lifecycle/Cargo.toml b/lifecycle/Cargo.toml index 970ca79adc..919031a441 100644 --- a/lifecycle/Cargo.toml +++ b/lifecycle/Cargo.toml @@ -16,3 +16,4 @@ tokio = { version = "1.11", features = ["macros", "time"] } tracker = { path = "../tracker" } [dev-dependencies] +tokio = { version = "1.11", features = ["macros", "time", "rt"] } diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index a3fdeffd7d..e96d789028 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -438,11 +438,6 @@ where to_persist.push(chunk); } - if to_persist.is_empty() { - info!(%db_name, %partition, "expected to persist but found no eligible chunks"); - return false; - } - let chunks = to_persist .into_iter() .map(|chunk| chunk.upgrade()) @@ -1314,8 +1309,8 @@ mod tests { .expect("expect early return due to task completion"); } - #[tokio::test] - async fn test_buffer_size_soft_drop_non_persisted() { + #[test] + fn test_buffer_size_soft_drop_non_persisted() { // test that chunk mover can drop non persisted chunks // if limit has been exceeded @@ -1379,8 +1374,8 @@ mod tests { ); } - #[tokio::test] - async fn test_buffer_size_soft_dont_drop_non_persisted() { + #[test] + fn test_buffer_size_soft_dont_drop_non_persisted() { // test that chunk mover unloads written chunks and can't drop // unpersisted chunks when the persist flag is true let rules = LifecycleRules { @@ -1711,6 +1706,32 @@ mod tests { ); } + #[test] + fn test_persist_empty() { + let rules = LifecycleRules { + persist: true, + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), + persist_age_threshold_seconds: NonZeroU32::new(20).unwrap(), + ..Default::default() + }; + let now = Instant::now(); + + // This could occur if the in-memory contents of a partition are deleted, and + // compaction causes the chunks to be removed. In such a scenario the persistence + // windows will still think there are rows to be persisted + let partitions = vec![TestPartition::new(vec![]).with_persistence( + 10, + now - Duration::from_secs(20), + from_secs(20), + )]; + + let db = TestDb::from_partitions(rules, partitions); + let mut lifecycle = LifecyclePolicy::new(&db); + + lifecycle.check_for_work(from_secs(0), now); + assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]); + } + #[test] fn test_suppress_persistence() { let rules = LifecycleRules { diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 7a4f07b0b5..46a4c7d1bc 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -308,7 +308,7 @@ impl PersistenceWindows { /// Acquire a handle that prevents mutation of the persistable window until dropped /// - /// Returns `None` if there is an outstanding handle + /// Returns `None` if there is an outstanding handle or nothing to persist pub fn flush_handle(&mut self, now: Instant) -> Option { // Verify no active flush handles before closing open window self.persistable.get_mut()?; diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 71aaa3b9ed..025fff6cb6 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -12,7 +12,7 @@ use query::QueryChunk; use async_trait::async_trait; -use server::db::LockablePartition; +use server::db::{LockableChunk, LockablePartition}; use server::utils::{ count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db, }; @@ -1085,47 +1085,43 @@ impl DbSetup for ChunkOrder { assert_eq!(count_read_buffer_chunks(&db), 1); assert_eq!(count_object_store_chunks(&db), 0); + // We prepare a persist, then drop the locks, perform another write, re-acquire locks + // and start a persist operation. In practice the lifecycle doesn't drop the locks + // before starting the persist operation, but this allows us to deterministically + // interleave a persist with a write + let partition = db.lockable_partition(table_name, partition_key).unwrap(); + let (chunks, flush_handle) = { + let partition = partition.read(); + let chunks = LockablePartition::chunks(&partition); + let mut partition = partition.upgrade(); + let flush_handle = LockablePartition::prepare_persist( + &mut partition, + Instant::now() + Duration::from_secs(1), + ) + .unwrap(); + + (chunks, flush_handle) + }; + // create second chunk: data->MUB write_lp(&db, "cpu,region=west user=2 100").await; assert_eq!(count_mutable_buffer_chunks(&db), 1); assert_eq!(count_read_buffer_chunks(&db), 1); assert_eq!(count_object_store_chunks(&db), 0); - // prevent the second chunk from being part of the persistence - // NOTE: In "real life" that could happen when writes happen while a persistence is in progress, but it's easier - // to trigger w/ this tiny locking trick. - let lockable_chunk = { - let partition = db.lockable_partition(table_name, partition_key).unwrap(); - let partition = partition.read(); - let mut chunks = LockablePartition::chunks(&partition); - assert_eq!(chunks.len(), 2); - chunks.remove(1) + let tracker = { + let partition = partition.write(); + let chunks = chunks.iter().map(|chunk| chunk.write()).collect(); + LockablePartition::persist_chunks(partition, chunks, flush_handle).unwrap() }; - lockable_chunk - .chunk - .write() - .set_dropping(&Default::default()) - .unwrap(); - // transform chunk 0 into chunk 2 by persisting - db.persist_partition( - "cpu", - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + tracker.join().await; + assert!(tracker.get_status().result().unwrap().success()); + assert_eq!(count_mutable_buffer_chunks(&db), 1); assert_eq!(count_read_buffer_chunks(&db), 1); assert_eq!(count_object_store_chunks(&db), 1); - // unlock chunk again - lockable_chunk - .chunk - .write() - .clear_lifecycle_action() - .unwrap(); - // Now we have the the following chunks (same partition and table): // // | ID | order | tag: region | field: user | time | diff --git a/server/src/db.rs b/server/src/db.rs index 260f9e1163..0fa45afbac 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -84,7 +84,7 @@ pub enum Error { #[snafu(display("Lifecycle error: {}", source))] LifecycleError { source: lifecycle::Error }, - #[snafu(display("Error freeinzing chunk while rolling over partition: {}", source))] + #[snafu(display("Error freezing chunk while rolling over partition: {}", source))] FreezingChunk { source: catalog::chunk::Error }, #[snafu(display("Error sending entry to write buffer"))] @@ -708,7 +708,7 @@ impl Db { })?; // get chunks for persistence, break after first chunk that cannot be persisted due to lifecycle reasons - let chunks: Vec<_> = chunks + let chunks = chunks .iter() .filter_map(|chunk| { let chunk = chunk.read(); @@ -720,24 +720,15 @@ impl Db { Some(chunk) } }) - .map(|chunk| { - if chunk.lifecycle_action().is_some() { - None - } else { - Some(chunk.upgrade()) + .map(|chunk| match chunk.lifecycle_action() { + Some(_) => CannotFlushPartition { + table_name, + partition_key, } + .fail(), + None => Ok(chunk.upgrade()), }) - .fuse() - .flatten() - .collect(); - - ensure!( - !chunks.is_empty(), - CannotFlushPartition { - table_name, - partition_key - } - ); + .collect::, _>>()?; let (_, fut) = lifecycle::persist_chunks( partition, diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 17fcac8ce7..333f37f9a1 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -31,11 +31,6 @@ pub fn persist_chunks( where F: Fn() -> DateTime + Send, { - assert!( - !chunks.is_empty(), - "must provide at least 1 chunk to persist" - ); - let now = std::time::Instant::now(); // time persist duration. let db = Arc::clone(&partition.data().db); let addr = partition.addr().clone(); @@ -87,13 +82,26 @@ where // drop partition lock guard let partition = partition.into_data().partition; - let time_of_first_write = time_of_first_write.expect("Should have had a first write somewhere"); - let time_of_last_write = time_of_last_write.expect("Should have had a last write somewhere"); - let metric_registry = Arc::clone(&db.metric_registry); let ctx = db.exec.new_context(ExecutorType::Reorg); let fut = async move { + if query_chunks.is_empty() { + partition + .write() + .persistence_windows_mut() + .unwrap() + .flush(flush_handle); + + return Ok(None); + } + + let time_of_first_write = + time_of_first_write.expect("Should have had a first write somewhere"); + + let time_of_last_write = + time_of_last_write.expect("Should have had a last write somewhere"); + let key = compute_sort_key(query_chunks.iter().map(|x| x.summary())); let key_str = format!("\"{}\"", key); // for logging @@ -391,4 +399,52 @@ mod tests { assert!(partition.read().persistence_windows().unwrap().is_empty()); } + + #[tokio::test] + async fn persist_compacted_deletes() { + let db = test_db().await; + + let late_arrival = Duration::from_secs(1); + let t0 = Instant::now(); + + *db.background_worker_now_override.lock() = Some(t0); + write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await; + + let partition_keys = db.partition_keys().unwrap(); + assert_eq!(partition_keys.len(), 1); + let partition_key = partition_keys.into_iter().next().unwrap(); + + let partition = db.partition("cpu", partition_key.as_str()).unwrap(); + + // Cannot simply use empty predicate (#2687) + let predicate = Arc::new(DeletePredicate { + range: TimestampRange { + start: 0, + end: 1_000, + }, + exprs: vec![], + }); + + // Delete everything + db.delete("cpu", predicate).await.unwrap(); + + // Compact deletes away + let chunk = db + .compact_partition("cpu", partition_key.as_str()) + .await + .unwrap(); + + assert!(chunk.is_none()); + + // Persistence windows unaware rows have been deleted + assert!(!partition.read().persistence_windows().unwrap().is_empty()); + + let maybe_chunk = db + .persist_partition("cpu", partition_key.as_str(), t0 + late_arrival * 2) + .await + .unwrap(); + + assert!(maybe_chunk.is_none()); + assert!(partition.read().persistence_windows().unwrap().is_empty()); + } } diff --git a/tracker/src/task.rs b/tracker/src/task.rs index bfd68ea05b..29a0ebd160 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -141,6 +141,11 @@ impl TaskResult { TaskResult::Error => "Error", } } + + /// Returns true if `self == TaskResult::Success` + pub fn success(&self) -> bool { + matches!(self, TaskResult::Success) + } } /// The status of the tracked task