Merge branch 'main' into crepererum/in_mem_expr_part5

pull/24376/head
kodiakhq[bot] 2021-10-05 16:20:24 +00:00 committed by GitHub
commit d72a494198
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 313 additions and 159 deletions

View File

@ -13,3 +13,5 @@ updates:
# Additionally the thrift-compiler version available in standard repos tends to lag
# the latest release significantly, and so updating to the latest version adds friction
- dependency-name: "thrift"
# https://github.com/influxdata/influxdb_iox/issues/2735
- dependency-name: "smallvec"

View File

@ -68,7 +68,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// .expect("connection must succeed");
/// # }
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Builder {
user_agent: String,
headers: Vec<(HeaderName, HeaderValue)>,
@ -171,3 +171,15 @@ impl Builder {
Self { timeout, ..self }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_cloneable() {
// Clone is used by Conductor.
fn assert_clone<T: Clone>(_t: T) {}
assert_clone(Builder::default())
}
}

View File

@ -6,7 +6,7 @@ edition = "2018"
[dependencies] # In alphabetical order
bytes = "1.0"
data_types = { path = "../data_types" }
data_types = { path = "../data_types", optional = true }
observability_deps = { path = "../observability_deps" }
pbjson = "0.1"
pbjson-types = "0.1"
@ -26,3 +26,7 @@ proc-macro2 = "=1.0.27"
tonic-build = "0.5"
prost-build = "0.8"
pbjson-build = "0.1"
[features]
default = []
data_types_conversions = ["data_types"]

View File

@ -65,7 +65,11 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
.compile_well_known_types()
.disable_comments(&[".google"])
.extern_path(".google.protobuf", "::pbjson_types")
.bytes(&[".influxdata.iox.catalog.v1.AddParquet.metadata"])
.bytes(&[
".influxdata.iox.catalog.v1.AddParquet.metadata",
".influxdata.iox.catalog.v1.Transaction.previous_uuid",
".influxdata.iox.catalog.v1.Transaction.uuid",
])
.btree_map(&[
".influxdata.iox.catalog.v1.DatabaseCheckpoint.sequencer_numbers",
".influxdata.iox.catalog.v1.PartitionCheckpoint.sequencer_numbers",

View File

@ -93,12 +93,19 @@ message Transaction {
// Revision counter, must by "previous revision" + 1 or 0 for the first transaction.
uint64 revision_counter = 3;
// Was string-formatted UUID and previous UUID.
reserved 4, 5;
// UUID unique to this transaction. Used to detect concurrent transactions. For the first transaction this field is
// empty.
string uuid = 4;
//
// UUID is stored as 16 bytes in big-endian order.
bytes uuid = 8;
// UUID of last commit.
string previous_uuid = 5;
//
// UUID is stored as 16 bytes in big-endian order.
bytes previous_uuid = 9;
// Start timestamp.
//

View File

@ -494,5 +494,4 @@ message DeleteRequest {
}
message DeleteResponse {
// NGA todo: define an appropriate response
}

View File

@ -140,11 +140,17 @@ pub fn protobuf_type_url_eq(url: &str, protobuf_type: &str) -> bool {
pub use com::github::influxdata::idpe::storage::read::*;
pub use influxdata::platform::storage::*;
pub mod chunk;
pub mod database_rules;
pub mod database_state;
pub mod detailed_database;
pub mod google;
#[cfg(feature = "data_types_conversions")]
pub mod chunk;
#[cfg(feature = "data_types_conversions")]
pub mod database_rules;
#[cfg(feature = "data_types_conversions")]
pub mod database_state;
#[cfg(feature = "data_types_conversions")]
pub mod detailed_database;
#[cfg(feature = "data_types_conversions")]
pub mod job;
#[cfg(test)]

View File

@ -1057,7 +1057,6 @@ impl Client {
_ => DeleteError::ServerError(status),
})?;
// NGA todo: return a handle to the delete?
Ok(())
}

View File

@ -6,9 +6,10 @@ edition = "2018"
[dependencies] # In alphabetical order
nom = "7"
smallvec = "1.7.0"
# See https://github.com/influxdata/influxdb_iox/issues/2735 before bumping smallvec
smallvec = "1.6.1"
snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }
test_helpers = { path = "../test_helpers" }

View File

@ -16,3 +16,4 @@ tokio = { version = "1.11", features = ["macros", "time"] }
tracker = { path = "../tracker" }
[dev-dependencies]
tokio = { version = "1.11", features = ["macros", "time", "rt"] }

View File

@ -438,11 +438,6 @@ where
to_persist.push(chunk);
}
if to_persist.is_empty() {
info!(%db_name, %partition, "expected to persist but found no eligible chunks");
return false;
}
let chunks = to_persist
.into_iter()
.map(|chunk| chunk.upgrade())
@ -1314,8 +1309,8 @@ mod tests {
.expect("expect early return due to task completion");
}
#[tokio::test]
async fn test_buffer_size_soft_drop_non_persisted() {
#[test]
fn test_buffer_size_soft_drop_non_persisted() {
// test that chunk mover can drop non persisted chunks
// if limit has been exceeded
@ -1379,8 +1374,8 @@ mod tests {
);
}
#[tokio::test]
async fn test_buffer_size_soft_dont_drop_non_persisted() {
#[test]
fn test_buffer_size_soft_dont_drop_non_persisted() {
// test that chunk mover unloads written chunks and can't drop
// unpersisted chunks when the persist flag is true
let rules = LifecycleRules {
@ -1711,6 +1706,32 @@ mod tests {
);
}
#[test]
fn test_persist_empty() {
let rules = LifecycleRules {
persist: true,
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
persist_age_threshold_seconds: NonZeroU32::new(20).unwrap(),
..Default::default()
};
let now = Instant::now();
// This could occur if the in-memory contents of a partition are deleted, and
// compaction causes the chunks to be removed. In such a scenario the persistence
// windows will still think there are rows to be persisted
let partitions = vec![TestPartition::new(vec![]).with_persistence(
10,
now - Duration::from_secs(20),
from_secs(20),
)];
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]);
}
#[test]
fn test_suppress_persistence() {
let rules = LifecycleRules {

View File

@ -39,7 +39,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError;
/// Current version for serialized transactions.
///
/// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 16;
pub const TRANSACTION_VERSION: u32 = 17;
#[derive(Debug, Snafu)]
pub enum Error {
@ -492,15 +492,18 @@ impl OpenTransaction {
start_timestamp: DateTime<Utc>,
) -> Self {
let (revision_counter, previous_uuid) = match previous_tkey {
Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()),
None => (0, String::new()),
Some(tkey) => (
tkey.revision_counter + 1,
tkey.uuid.as_bytes().to_vec().into(),
),
None => (0, Bytes::new()),
};
Self {
proto: proto::Transaction {
actions: vec![],
version: TRANSACTION_VERSION,
uuid: uuid.to_string(),
uuid: uuid.as_bytes().to_vec().into(),
revision_counter,
previous_uuid,
start_timestamp: Some(start_timestamp.into()),
@ -512,7 +515,7 @@ impl OpenTransaction {
fn tkey(&self) -> TransactionKey {
TransactionKey {
revision_counter: self.proto.revision_counter,
uuid: Uuid::parse_str(&self.proto.uuid).expect("UUID was checked before"),
uuid: Uuid::from_slice(&self.proto.uuid).expect("UUID was checked before"),
}
}
@ -933,11 +936,11 @@ impl<'c> CheckpointHandle<'c> {
let proto = proto::Transaction {
actions,
version: TRANSACTION_VERSION,
uuid: self.tkey.uuid.to_string(),
uuid: self.tkey.uuid.as_bytes().to_vec().into(),
revision_counter: self.tkey.revision_counter,
previous_uuid: self
.previous_tkey
.map_or_else(String::new, |tkey| tkey.uuid.to_string()),
.map_or_else(Bytes::new, |tkey| tkey.uuid.as_bytes().to_vec().into()),
start_timestamp: Some(Utc::now().into()),
encoding: proto::transaction::Encoding::Full.into(),
};
@ -1173,9 +1176,9 @@ mod tests {
let mut proto = load_transaction_proto(&iox_object_store, &path)
.await
.unwrap();
let uuid_expected = Uuid::parse_str(&proto.uuid).unwrap();
let uuid_expected = Uuid::from_slice(&proto.uuid).unwrap();
let uuid_actual = Uuid::nil();
proto.uuid = uuid_actual.to_string();
proto.uuid = uuid_actual.as_bytes().to_vec().into();
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1204,7 +1207,7 @@ mod tests {
let mut proto = load_transaction_proto(&iox_object_store, &path)
.await
.unwrap();
proto.uuid = String::new();
proto.uuid = Bytes::new();
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1230,7 +1233,7 @@ mod tests {
let mut proto = load_transaction_proto(&iox_object_store, &path)
.await
.unwrap();
proto.uuid = "foo".to_string();
proto.uuid = Bytes::from("foo");
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1240,7 +1243,7 @@ mod tests {
PreservedCatalog::load::<TestCatalogState>(Arc::clone(&iox_object_store), ()).await;
assert_eq!(
res.unwrap_err().to_string(),
"Internal: Error while parsing protobuf: Cannot parse UUID: invalid length: expected one of [36, 32], found 3"
"Internal: Error while parsing protobuf: Cannot parse UUID: invalid bytes length: expected 16, found 3"
);
}
@ -1256,7 +1259,7 @@ mod tests {
let mut proto = load_transaction_proto(&iox_object_store, &path)
.await
.unwrap();
proto.previous_uuid = Uuid::nil().to_string();
proto.previous_uuid = Uuid::nil().as_bytes().to_vec().into();
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1279,7 +1282,7 @@ mod tests {
let mut proto = load_transaction_proto(&iox_object_store, &path)
.await
.unwrap();
proto.previous_uuid = Uuid::nil().to_string();
proto.previous_uuid = Uuid::nil().as_bytes().to_vec().into();
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1302,7 +1305,7 @@ mod tests {
let mut proto = load_transaction_proto(&iox_object_store, &path)
.await
.unwrap();
proto.previous_uuid = "foo".to_string();
proto.previous_uuid = Bytes::from("foo");
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1312,7 +1315,7 @@ mod tests {
PreservedCatalog::load::<TestCatalogState>(Arc::clone(&iox_object_store), ()).await;
assert_eq!(
res.unwrap_err().to_string(),
"Internal: Error while parsing protobuf: Cannot parse UUID: invalid length: expected one of [36, 32], found 3"
"Internal: Error while parsing protobuf: Cannot parse UUID: invalid bytes length: expected 16, found 3"
);
}
@ -1351,7 +1354,7 @@ mod tests {
let mut t = catalog.open_transaction().await;
// open transaction
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string();
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().as_bytes().to_vec().into();
assert_eq!(
format!("{:?}", t),
"TransactionHandle(open, 1.00000000-0000-0000-0000-000000000000)"
@ -1379,7 +1382,7 @@ mod tests {
let new_uuid = Uuid::new_v4();
tkey.uuid = new_uuid;
let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid);
proto.uuid = new_uuid.to_string();
proto.uuid = new_uuid.as_bytes().to_vec().into();
store_transaction_proto(&iox_object_store, &path, &proto)
.await
.unwrap();
@ -1412,7 +1415,7 @@ mod tests {
let new_uuid = Uuid::new_v4();
tkey.uuid = new_uuid;
let path = TransactionFilePath::new_checkpoint(tkey.revision_counter, tkey.uuid);
proto.uuid = new_uuid.to_string();
proto.uuid = new_uuid.as_bytes().to_vec().into();
proto.encoding = proto::transaction::Encoding::Full.into();
store_transaction_proto(&iox_object_store, &path, &proto)
.await
@ -2087,7 +2090,7 @@ mod tests {
.unwrap();
let mut t = catalog.open_transaction().await;
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string();
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().as_bytes().to_vec().into();
assert_eq!(t.uuid(), Uuid::nil());
}

View File

@ -272,11 +272,11 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 16,
version: 17,
actions: [],
revision_counter: 0,
uuid: "00000000-0000-0000-0000-000000000000",
previous_uuid: "",
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"",
start_timestamp: Some(
Timestamp {
seconds: 10,
@ -297,7 +297,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 16,
version: 17,
actions: [
Action {
action: Some(
@ -320,8 +320,8 @@ File {
},
],
revision_counter: 1,
uuid: "00000000-0000-0000-0000-000000000000",
previous_uuid: "00000000-0000-0000-0000-000000000000",
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
start_timestamp: Some(
Timestamp {
seconds: 10,
@ -396,11 +396,11 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 16,
version: 17,
actions: [],
revision_counter: 0,
uuid: "00000000-0000-0000-0000-000000000000",
previous_uuid: "",
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"",
start_timestamp: Some(
Timestamp {
seconds: 10,
@ -421,7 +421,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 16,
version: 17,
actions: [
Action {
action: Some(
@ -444,8 +444,8 @@ File {
},
],
revision_counter: 1,
uuid: "00000000-0000-0000-0000-000000000000",
previous_uuid: "00000000-0000-0000-0000-000000000000",
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
previous_uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
start_timestamp: Some(
Timestamp {
seconds: 10,

View File

@ -1,4 +1,4 @@
use std::{convert::TryInto, num::TryFromIntError, str::FromStr};
use std::{convert::TryInto, num::TryFromIntError};
use chrono::{DateTime, Utc};
use generated_types::influxdata::iox::catalog::v1 as proto;
@ -33,19 +33,19 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Parse UUID from protobuf.
pub fn parse_uuid(s: &str) -> Result<Option<Uuid>> {
if s.is_empty() {
/// Parse big-endian UUID from protobuf.
pub fn parse_uuid(bytes: &[u8]) -> Result<Option<Uuid>> {
if bytes.is_empty() {
Ok(None)
} else {
let uuid = Uuid::from_str(s).context(UuidParse {})?;
let uuid = Uuid::from_slice(bytes).context(UuidParse {})?;
Ok(Some(uuid))
}
}
/// Parse UUID from protobuf and fail if protobuf did not provide data.
pub fn parse_uuid_required(s: &str) -> Result<Uuid> {
parse_uuid(s)?.context(UuidRequired {})
/// Parse big-endian UUID from protobuf and fail if protobuf did not provide data.
pub fn parse_uuid_required(bytes: &[u8]) -> Result<Uuid> {
parse_uuid(bytes)?.context(UuidRequired {})
}
/// Parse [`ParquetFilePath`](iox_object_store::ParquetFilePath) from protobuf.

View File

@ -308,7 +308,7 @@ impl PersistenceWindows {
/// Acquire a handle that prevents mutation of the persistable window until dropped
///
/// Returns `None` if there is an outstanding handle
/// Returns `None` if there is an outstanding handle or nothing to persist
pub fn flush_handle(&mut self, now: Instant) -> Option<FlushHandle> {
// Verify no active flush handles before closing open window
self.persistable.get_mut()?;

View File

@ -649,7 +649,6 @@ mod tests {
assert_eq!(expected, result);
}
// NGA todo: check content of error messages
#[test]
fn test_parse_delete_negative() {
// invalid key

View File

@ -12,7 +12,7 @@ use query::QueryChunk;
use async_trait::async_trait;
use server::db::LockablePartition;
use server::db::{LockableChunk, LockablePartition};
use server::utils::{
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
};
@ -1085,47 +1085,43 @@ impl DbSetup for ChunkOrder {
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
// We prepare a persist, then drop the locks, perform another write, re-acquire locks
// and start a persist operation. In practice the lifecycle doesn't drop the locks
// before starting the persist operation, but this allows us to deterministically
// interleave a persist with a write
let partition = db.lockable_partition(table_name, partition_key).unwrap();
let (chunks, flush_handle) = {
let partition = partition.read();
let chunks = LockablePartition::chunks(&partition);
let mut partition = partition.upgrade();
let flush_handle = LockablePartition::prepare_persist(
&mut partition,
Instant::now() + Duration::from_secs(1),
)
.unwrap();
(chunks, flush_handle)
};
// create second chunk: data->MUB
write_lp(&db, "cpu,region=west user=2 100").await;
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
// prevent the second chunk from being part of the persistence
// NOTE: In "real life" that could happen when writes happen while a persistence is in progress, but it's easier
// to trigger w/ this tiny locking trick.
let lockable_chunk = {
let partition = db.lockable_partition(table_name, partition_key).unwrap();
let partition = partition.read();
let mut chunks = LockablePartition::chunks(&partition);
assert_eq!(chunks.len(), 2);
chunks.remove(1)
let tracker = {
let partition = partition.write();
let chunks = chunks.iter().map(|chunk| chunk.write()).collect();
LockablePartition::persist_chunks(partition, chunks, flush_handle).unwrap()
};
lockable_chunk
.chunk
.write()
.set_dropping(&Default::default())
.unwrap();
// transform chunk 0 into chunk 2 by persisting
db.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
tracker.join().await;
assert!(tracker.get_status().result().unwrap().success());
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
// unlock chunk again
lockable_chunk
.chunk
.write()
.clear_lifecycle_action()
.unwrap();
// Now we have the the following chunks (same partition and table):
//
// | ID | order | tag: region | field: user | time |

View File

@ -21,7 +21,7 @@ entry = { path = "../entry" }
flatbuffers = "2"
futures = "0.3"
futures-util = { version = "0.3.1" }
generated_types = { path = "../generated_types" }
generated_types = { path = "../generated_types", features = ["data_types_conversions"] }
hashbrown = "0.11"
influxdb_iox_client = { path = "../influxdb_iox_client" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }

View File

@ -84,7 +84,7 @@ pub enum Error {
#[snafu(display("Lifecycle error: {}", source))]
LifecycleError { source: lifecycle::Error },
#[snafu(display("Error freeinzing chunk while rolling over partition: {}", source))]
#[snafu(display("Error freezing chunk while rolling over partition: {}", source))]
FreezingChunk { source: catalog::chunk::Error },
#[snafu(display("Error sending entry to write buffer"))]
@ -706,7 +706,7 @@ impl Db {
})?;
// get chunks for persistence, break after first chunk that cannot be persisted due to lifecycle reasons
let chunks: Vec<_> = chunks
let chunks = chunks
.iter()
.filter_map(|chunk| {
let chunk = chunk.read();
@ -718,24 +718,15 @@ impl Db {
Some(chunk)
}
})
.map(|chunk| {
if chunk.lifecycle_action().is_some() {
None
} else {
Some(chunk.upgrade())
.map(|chunk| match chunk.lifecycle_action() {
Some(_) => CannotFlushPartition {
table_name,
partition_key,
}
.fail(),
None => Ok(chunk.upgrade()),
})
.fuse()
.flatten()
.collect();
ensure!(
!chunks.is_empty(),
CannotFlushPartition {
table_name,
partition_key
}
);
.collect::<Result<Vec<_>, _>>()?;
let (_, fut) = lifecycle::persist_chunks(
partition,

View File

@ -31,11 +31,6 @@ pub fn persist_chunks<F>(
where
F: Fn() -> DateTime<Utc> + Send,
{
assert!(
!chunks.is_empty(),
"must provide at least 1 chunk to persist"
);
let now = std::time::Instant::now(); // time persist duration.
let db = Arc::clone(&partition.data().db);
let addr = partition.addr().clone();
@ -87,13 +82,26 @@ where
// drop partition lock guard
let partition = partition.into_data().partition;
let time_of_first_write = time_of_first_write.expect("Should have had a first write somewhere");
let time_of_last_write = time_of_last_write.expect("Should have had a last write somewhere");
let metric_registry = Arc::clone(&db.metric_registry);
let ctx = db.exec.new_context(ExecutorType::Reorg);
let fut = async move {
if query_chunks.is_empty() {
partition
.write()
.persistence_windows_mut()
.unwrap()
.flush(flush_handle);
return Ok(None);
}
let time_of_first_write =
time_of_first_write.expect("Should have had a first write somewhere");
let time_of_last_write =
time_of_last_write.expect("Should have had a last write somewhere");
let key = compute_sort_key(query_chunks.iter().map(|x| x.summary()));
let key_str = format!("\"{}\"", key); // for logging
@ -391,4 +399,52 @@ mod tests {
assert!(partition.read().persistence_windows().unwrap().is_empty());
}
#[tokio::test]
async fn persist_compacted_deletes() {
let db = test_db().await;
let late_arrival = Duration::from_secs(1);
let t0 = Instant::now();
*db.background_worker_now_override.lock() = Some(t0);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
let partition_keys = db.partition_keys().unwrap();
assert_eq!(partition_keys.len(), 1);
let partition_key = partition_keys.into_iter().next().unwrap();
let partition = db.partition("cpu", partition_key.as_str()).unwrap();
// Cannot simply use empty predicate (#2687)
let predicate = Arc::new(DeletePredicate {
range: TimestampRange {
start: 0,
end: 1_000,
},
exprs: vec![],
});
// Delete everything
db.delete("cpu", predicate).await.unwrap();
// Compact deletes away
let chunk = db
.compact_partition("cpu", partition_key.as_str())
.await
.unwrap();
assert!(chunk.is_none());
// Persistence windows unaware rows have been deleted
assert!(!partition.read().persistence_windows().unwrap().is_empty());
let maybe_chunk = db
.persist_partition("cpu", partition_key.as_str(), t0 + late_arrival * 2)
.await
.unwrap();
assert!(maybe_chunk.is_none());
assert!(partition.read().persistence_windows().unwrap().is_empty());
}
}

View File

@ -282,7 +282,7 @@ impl ApplicationError {
Self::ParsingLineProtocol { .. } => self.bad_request(),
Self::ParsingDelete { .. } => self.bad_request(),
Self::BuildingDeletePredicate { .. } => self.bad_request(),
Self::ExecutingDelete { .. } => self.internal_error(),
Self::ExecutingDelete { .. } => self.bad_request(),
Self::ReadingBodyAsGzip { .. } => self.bad_request(),
Self::ClientHangup { .. } => self.bad_request(),
Self::RouteNotFound { .. } => self.not_found(),
@ -1150,6 +1150,44 @@ mod tests {
"+----------------+--------------+-------+-----------------+----------------------+",
];
assert_batches_eq!(expected, &batches);
// -------------------
// negative tests
// Not able to parse _measurement="not_a_table" (it must be _measurement=\"not_a_table\" to work)
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement="not_a_table" and location=Boston"}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(delete_line)
.send()
.await;
check_response(
"delete",
response,
StatusCode::BAD_REQUEST,
Some("Unable to parse delete string"),
)
.await;
// delete from non-existing table
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=not_a_table and location=Boston"}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(delete_line)
.send()
.await;
check_response(
"delete",
response,
StatusCode::BAD_REQUEST,
Some("Cannot delete data from non-existing table"),
)
.await;
}
#[tokio::test]
@ -1607,7 +1645,7 @@ mod tests {
assert_eq!(status, expected_status);
if let Some(expected_body) = expected_body {
assert_eq!(body, expected_body);
assert!(body.contains(expected_body));
}
} else {
panic!("Unexpected error response: {:?}", response);

View File

@ -177,6 +177,12 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status {
),
}
.into(),
Error::DeleteFromTable { source, table_name } => PreconditionViolation {
category: "database".to_string(),
subject: "influxdata.com/iox".to_string(),
description: format!("Cannot delete data from table: {} : {}", table_name, source),
}
.into(),
Error::CatalogError { source } => default_catalog_error_handler(source),
error => {
error!(?error, "Unexpected error");

View File

@ -633,14 +633,13 @@ where
}))
}
Ok(del_predicate) => {
//execute delete
// execute delete
db.delete(&table_name, Arc::new(del_predicate))
.await
.map_err(default_db_error_handler)?;
}
}
// NGA todo: return a delete handle with the response?
Ok(Response::new(DeleteResponse {}))
}
}

View File

@ -112,6 +112,16 @@ fn group_description_to_frames(group_description: GroupDescription) -> Vec<Frame
.map(|(k, v)| (k.bytes().collect(), v.bytes().collect()))
.unzip();
// Flux expects there to be `_field` and `_measurement` as the
// first two "tags". Note this means the lengths of tag_keys and
// partition_key_values is different.
//
// See https://github.com/influxdata/influxdb_iox/issues/2690 for gory details
let tag_keys = vec![b"_field".to_vec(), b"_measurement".to_vec()]
.into_iter()
.chain(tag_keys.into_iter())
.collect::<Vec<_>>();
let group_frame = GroupFrame {
tag_keys,
partition_key_vals,
@ -232,8 +242,8 @@ fn field_to_data(
Ok(())
}
// Convert the tag=value pairs from the series set to the correct gRPC
// format, and add the _f and _m tags for the field name and measurement
/// Convert the tag=value pairs from the series set to the correct gRPC
/// format, and add the _f and _m tags for the field name and measurement
fn convert_tags(table_name: &str, field_name: &str, tags: &[(Arc<str>, Arc<str>)]) -> Vec<Tag> {
// Special case "measurement" name which is modeled as a tag of
// "_measurement" and "field" which is modeled as a tag of "_field"
@ -548,8 +558,9 @@ mod tests {
.map(|f| dump_frame(f))
.collect::<Vec<_>>();
let expected_frames =
vec!["GroupFrame, tag_keys: tag1,tag2, partition_key_vals: val1,val2"];
let expected_frames = vec![
"GroupFrame, tag_keys: _field,_measurement,tag1,tag2, partition_key_vals: val1,val2",
];
assert_eq!(
dumped_frames, expected_frames,

View File

@ -1587,8 +1587,10 @@ async fn test_delete() {
let pred = "region = west";
let del = management_client
.delete(db_name.clone(), table, start, stop, pred)
.await;
assert!(del.is_err());
.await
.unwrap_err()
.to_string();
assert!(del.contains("Cannot delete data from table"));
// Verify both existing tables still have the same data
// query to verify data deleted

View File

@ -401,7 +401,7 @@ async fn test_read_group_none_agg(
};
let expected_group_frames = vec![
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
@ -410,7 +410,7 @@ async fn test_read_group_none_agg(
"FloatPointsFrame, timestamps: [1000, 2000], values: \"10,11\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"71,72\"",
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
@ -424,11 +424,9 @@ async fn test_read_group_none_agg(
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
assert_eq!(
expected_group_frames,
actual_group_frames,
"Expected:\n{}\nActual:\n{}",
expected_group_frames.join("\n"),
actual_group_frames.join("\n")
expected_group_frames, actual_group_frames,
"Expected:\n{:#?}\nActual:\n{:#?}",
expected_group_frames, actual_group_frames
);
}
@ -453,12 +451,12 @@ async fn test_read_group_none_agg_with_predicate(
};
let expected_group_frames = vec![
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"20\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"10\"",
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"40\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
@ -468,11 +466,9 @@ async fn test_read_group_none_agg_with_predicate(
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
assert_eq!(
expected_group_frames,
actual_group_frames,
"Expected:\n{}\nActual:\n{}",
expected_group_frames.join("\n"),
actual_group_frames.join("\n")
expected_group_frames, actual_group_frames,
"Expected:\n{:#?}\nActual:\n{:#?}",
expected_group_frames, actual_group_frames
);
}
@ -500,7 +496,7 @@ async fn test_read_group_sum_agg(
};
let expected_group_frames = vec![
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
@ -509,7 +505,7 @@ async fn test_read_group_sum_agg(
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"143\"",
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"81\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
@ -523,11 +519,9 @@ async fn test_read_group_sum_agg(
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
assert_eq!(
expected_group_frames,
actual_group_frames,
"Expected:\n{}\nActual:\n{}",
expected_group_frames.join("\n"),
actual_group_frames.join("\n")
expected_group_frames, actual_group_frames,
"Expected:\n{:#?}\nActual:\n{:#?}",
expected_group_frames, actual_group_frames,
);
}
@ -555,7 +549,7 @@ async fn test_read_group_last_agg(
};
let expected_group_frames = vec![
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
@ -564,7 +558,7 @@ async fn test_read_group_last_agg(
"FloatPointsFrame, timestamps: [2000], values: \"11\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"72\"",
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
@ -578,11 +572,9 @@ async fn test_read_group_last_agg(
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
assert_eq!(
expected_group_frames,
actual_group_frames,
"Expected:\n{}\nActual:\n{}",
expected_group_frames.join("\n"),
actual_group_frames.join("\n")
expected_group_frames, actual_group_frames,
"Expected:\n{:#?}\nActual:\n{:#?}",
expected_group_frames, actual_group_frames,
);
}

View File

@ -141,6 +141,11 @@ impl TaskResult {
TaskResult::Error => "Error",
}
}
/// Returns true if `self == TaskResult::Success`
pub fn success(&self) -> bool {
matches!(self, TaskResult::Success)
}
}
/// The status of the tracked task