refactor: remove write buffer producer logic from `Db`
As a side effect, writing to database no longer requires `async`. Closes #2243.pull/24376/head
parent
00143315ac
commit
ac20fecf6f
|
@ -185,8 +185,7 @@ impl HttpDrivenDml for DatabaseServerType {
|
|||
})?;
|
||||
|
||||
database
|
||||
.route_operation(&DmlOperation::Write(write))
|
||||
.await
|
||||
.store_operation(&DmlOperation::Write(write))
|
||||
.map_err(|e| InnerDmlError::InternalError {
|
||||
db_name: db_name.to_string(),
|
||||
source: Box::new(e),
|
||||
|
@ -200,8 +199,7 @@ impl HttpDrivenDml for DatabaseServerType {
|
|||
})?;
|
||||
|
||||
database
|
||||
.route_operation(&DmlOperation::Delete(delete))
|
||||
.await
|
||||
.store_operation(&DmlOperation::Delete(delete))
|
||||
.map_err(|e| match e {
|
||||
WriteError::DbError { source } => match source {
|
||||
server::db::Error::DeleteFromTable { table_name, .. } => {
|
||||
|
|
|
@ -41,8 +41,7 @@ impl delete_service_server::DeleteService for DeleteService {
|
|||
.map_err(default_server_error_handler)?;
|
||||
|
||||
database
|
||||
.route_operation(&DmlOperation::Delete(delete))
|
||||
.await
|
||||
.store_operation(&DmlOperation::Delete(delete))
|
||||
.map_err(default_database_write_error_handler)?;
|
||||
|
||||
Ok(Response::new(DeleteResponse {}))
|
||||
|
|
|
@ -41,8 +41,7 @@ impl write_service_server::WriteService for PBWriteService {
|
|||
.map_err(default_server_error_handler)?;
|
||||
|
||||
database
|
||||
.route_operation(&DmlOperation::Write(write))
|
||||
.await
|
||||
.store_operation(&DmlOperation::Write(write))
|
||||
.map_err(default_database_write_error_handler)?;
|
||||
|
||||
Ok(tonic::Response::new(WriteResponse {}))
|
||||
|
|
|
@ -31,7 +31,7 @@ async fn test_query_cancellation_slow_store() {
|
|||
|
||||
// create persisted chunk
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data).await;
|
||||
write_lp(&db, data);
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -51,7 +51,7 @@ async fn test_query_cancellation_slow_store() {
|
|||
|
||||
// create in-memory chunk
|
||||
let data = "cpu,region=east user=0.1 42";
|
||||
write_lp(&db, data).await;
|
||||
write_lp(&db, data);
|
||||
|
||||
// make store access really slow
|
||||
if let ObjectStoreIntegration::InMemoryThrottled(inner) = &object_store.integration {
|
||||
|
|
|
@ -208,7 +208,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
|
|||
|
||||
let db = make_db().await.db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data).await;
|
||||
write_lp(&db, &data);
|
||||
let scenario1 = DbScenario {
|
||||
scenario_name: "Data in 4 partitions, open chunks of mutable buffer".into(),
|
||||
db,
|
||||
|
@ -216,7 +216,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
|
|||
|
||||
let db = make_db().await.db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data).await;
|
||||
write_lp(&db, &data);
|
||||
db.rollover_partition("h2o", "2020-03-01T00").await.unwrap();
|
||||
db.rollover_partition("h2o", "2020-03-02T00").await.unwrap();
|
||||
let scenario2 = DbScenario {
|
||||
|
@ -228,7 +228,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
|
|||
|
||||
let db = make_db().await.db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data).await;
|
||||
write_lp(&db, &data);
|
||||
// roll over and load chunks into both RUB and OS
|
||||
rollover_and_load(&db, "2020-03-01T00", "h2o").await;
|
||||
rollover_and_load(&db, "2020-03-02T00", "h2o").await;
|
||||
|
|
|
@ -20,16 +20,16 @@ async fn setup() -> TestDb {
|
|||
let db = &test_db.db;
|
||||
|
||||
// Chunk 0 has bar:[1-2]
|
||||
write_lp(db, "cpu bar=1 10").await;
|
||||
write_lp(db, "cpu bar=2 20").await;
|
||||
write_lp(db, "cpu bar=1 10");
|
||||
write_lp(db, "cpu bar=2 20");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
db.compact_open_chunk("cpu", partition_key).await.unwrap();
|
||||
|
||||
// Chunk 1 has bar:[3-3] (going to get pruned)
|
||||
write_lp(db, "cpu bar=3 10").await;
|
||||
write_lp(db, "cpu bar=3 100").await;
|
||||
write_lp(db, "cpu bar=3 1000").await;
|
||||
write_lp(db, "cpu bar=3 10");
|
||||
write_lp(db, "cpu bar=3 100");
|
||||
write_lp(db, "cpu bar=3 1000");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
db.compact_open_chunk("cpu", partition_key).await.unwrap();
|
||||
|
|
|
@ -119,7 +119,7 @@ impl DbSetup for NoData {
|
|||
//
|
||||
let db = make_db().await.db;
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data).await;
|
||||
write_lp(&db, data);
|
||||
// move data out of open chunk
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
|
@ -157,7 +157,7 @@ impl DbSetup for NoData {
|
|||
//
|
||||
let db = make_db().await.db;
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data).await;
|
||||
write_lp(&db, data);
|
||||
// move data out of open chunk
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
|
@ -621,7 +621,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
|
|||
"o2,state=CA temp=79.0 300",
|
||||
];
|
||||
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
vec![DbScenario {
|
||||
scenario_name: "Data in open chunk of mutable buffer".into(),
|
||||
db,
|
||||
|
@ -646,7 +646,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneRubChunk {
|
|||
"o2,state=CA temp=79.0 300",
|
||||
];
|
||||
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
|
||||
// move all data to RUB
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
@ -672,7 +672,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
|
|||
"h2o,state=MA,city=Boston temp=70.4 50",
|
||||
"h2o,state=MA,city=Boston other_temp=70.4 250",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_partition("h2o", partition_key).await.unwrap();
|
||||
|
||||
let lp_lines = vec![
|
||||
|
@ -680,7 +680,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
|
|||
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
|
||||
"o2,state=CA temp=79.0 300",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 2);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1);
|
||||
|
@ -708,7 +708,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
|
|||
"h2o,state=MA temp=70.4 50",
|
||||
"h2o,state=MA other_temp=70.4 250",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_partition("h2o", partition_key).await.unwrap();
|
||||
|
||||
// tag: city
|
||||
|
@ -716,7 +716,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
|
|||
"h2o,city=Boston other_temp=72.4 350",
|
||||
"h2o,city=Boston temp=53.4,reading=51 50",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
|
@ -749,7 +749,7 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
|
|||
"h2o,state=MA,city=Boston max_temp=75.4 250",
|
||||
"h2o,state=MA,city=Andover max_temp=69.2, 250",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
||||
// Chunk 2: overlaps with chunk 1
|
||||
|
@ -763,7 +763,7 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
|
|||
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
|
||||
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
||||
// Chunk 3: no overlap
|
||||
|
@ -777,7 +777,7 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
|
|||
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
|
||||
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
||||
// Chunk 4: no overlap
|
||||
|
@ -791,7 +791,7 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
|
|||
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
|
||||
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
|
||||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
|
@ -823,8 +823,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
|
|||
"h2o,state=MA,city=Boston other_temp=70.4 250",
|
||||
]
|
||||
.join("\n"),
|
||||
)
|
||||
.await;
|
||||
);
|
||||
|
||||
db.compact_open_chunk("h2o", partition_key).await.unwrap();
|
||||
|
||||
|
@ -835,8 +834,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
|
|||
write_lp(
|
||||
&db,
|
||||
&vec!["h2o,state=CA,city=Boston other_temp=72.4 350"].join("\n"),
|
||||
)
|
||||
.await;
|
||||
);
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1);
|
||||
|
@ -984,7 +982,7 @@ impl DbSetup for EndToEndTestWithDelete {
|
|||
pub(crate) async fn make_one_chunk_mub_scenario(data: &str) -> Vec<DbScenario> {
|
||||
// Scenario 1: One open chunk in MUB
|
||||
let db = make_db().await.db;
|
||||
write_lp(&db, data).await;
|
||||
write_lp(&db, data);
|
||||
let scenario = DbScenario {
|
||||
scenario_name: "Data in open chunk of mutable buffer".into(),
|
||||
db,
|
||||
|
@ -1001,7 +999,7 @@ pub(crate) async fn make_one_chunk_rub_scenario(
|
|||
) -> Vec<DbScenario> {
|
||||
// Scenario 1: One closed chunk in RUB
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
let table_names = write_lp(&db, data);
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
|
@ -1030,7 +1028,7 @@ impl DbSetup for OneMeasurementAllChunksDropped {
|
|||
let table_name = "h2o";
|
||||
|
||||
let lp_lines = vec!["h2o,state=MA temp=70.4 50"];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
let chunk_id = db
|
||||
.compact_open_chunk(table_name, partition_key)
|
||||
.await
|
||||
|
@ -1060,8 +1058,8 @@ pub async fn make_two_chunk_scenarios(
|
|||
data2: &str,
|
||||
) -> Vec<DbScenario> {
|
||||
let db = make_db().await.db;
|
||||
write_lp(&db, data1).await;
|
||||
write_lp(&db, data2).await;
|
||||
write_lp(&db, data1);
|
||||
write_lp(&db, data2);
|
||||
let scenario1 = DbScenario {
|
||||
scenario_name: "Data in single open chunk of mutable buffer".into(),
|
||||
db,
|
||||
|
@ -1069,13 +1067,13 @@ pub async fn make_two_chunk_scenarios(
|
|||
|
||||
// spread across 2 mutable buffer chunks
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
write_lp(&db, data2).await;
|
||||
write_lp(&db, data2);
|
||||
let scenario2 = DbScenario {
|
||||
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
|
||||
db,
|
||||
|
@ -1083,13 +1081,13 @@ pub async fn make_two_chunk_scenarios(
|
|||
|
||||
// spread across 1 mutable buffer, 1 read buffer chunks
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
write_lp(&db, data2).await;
|
||||
write_lp(&db, data2);
|
||||
let scenario3 = DbScenario {
|
||||
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
|
||||
db,
|
||||
|
@ -1097,13 +1095,13 @@ pub async fn make_two_chunk_scenarios(
|
|||
|
||||
// in 2 read buffer chunks
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await;
|
||||
let table_names = write_lp(&db, data2);
|
||||
for table_name in &table_names {
|
||||
// Compact just the last chunk
|
||||
db.compact_open_chunk(table_name, partition_key)
|
||||
|
@ -1117,13 +1115,13 @@ pub async fn make_two_chunk_scenarios(
|
|||
|
||||
// in 2 read buffer chunks that also loaded into object store
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await;
|
||||
let table_names = write_lp(&db, data2);
|
||||
for table_name in &table_names {
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
|
@ -1136,7 +1134,7 @@ pub async fn make_two_chunk_scenarios(
|
|||
|
||||
// Scenario 6: Two closed chunk in OS only
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
let id = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
|
@ -1147,7 +1145,7 @@ pub async fn make_two_chunk_scenarios(
|
|||
db.unload_read_buffer(table_name, partition_key, id)
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await;
|
||||
let table_names = write_lp(&db, data2);
|
||||
for table_name in &table_names {
|
||||
let id = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
|
@ -1166,14 +1164,14 @@ pub async fn make_two_chunk_scenarios(
|
|||
|
||||
// Scenario 7: in a single chunk resulting from compacting MUB and RUB
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
// put chunk 1 into RUB
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await; // write to MUB
|
||||
let table_names = write_lp(&db, data2); // write to MUB
|
||||
for table_name in &table_names {
|
||||
// compact chunks into a single RUB chunk
|
||||
db.compact_partition(table_name, partition_key)
|
||||
|
@ -1204,7 +1202,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
|
|||
) -> Vec<DbScenario> {
|
||||
// Scenario 1: One closed chunk in RUB
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
let table_names = write_lp(&db, data);
|
||||
for table_name in &table_names {
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
|
@ -1217,7 +1215,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
|
|||
|
||||
// Scenario 2: One closed chunk in Parquet only
|
||||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
let table_names = write_lp(&db, data);
|
||||
for table_name in &table_names {
|
||||
let id = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
|
@ -1287,7 +1285,7 @@ impl DbSetup for ChunkOrder {
|
|||
let db = make_db().await.db;
|
||||
|
||||
// create first chunk: data->MUB->RUB
|
||||
write_lp(&db, "cpu,region=west user=1 100").await;
|
||||
write_lp(&db, "cpu,region=west user=1 100");
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_object_store_chunks(&db), 0);
|
||||
|
@ -1313,7 +1311,7 @@ impl DbSetup for ChunkOrder {
|
|||
};
|
||||
|
||||
// create second chunk: data->MUB
|
||||
write_lp(&db, "cpu,region=west user=2 100").await;
|
||||
write_lp(&db, "cpu,region=west user=2 100");
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1);
|
||||
assert_eq!(count_object_store_chunks(&db), 0);
|
||||
|
|
|
@ -189,7 +189,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// Make an open MUB
|
||||
//
|
||||
// There may be more than one tables in the lp data
|
||||
let tables = write_lp(&db, &lp_lines.join("\n")).await;
|
||||
let tables = write_lp(&db, &lp_lines.join("\n"));
|
||||
for table in &tables {
|
||||
let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key);
|
||||
// must be one MUB per table
|
||||
|
@ -434,7 +434,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
|
|||
|
||||
// ----------
|
||||
// Make an open MUB
|
||||
write_lp(&db, &chunk_data.lp_lines.join("\n")).await;
|
||||
write_lp(&db, &chunk_data.lp_lines.join("\n"));
|
||||
// 0 does not represent the real chunk id. It is here just to initialize the chunk_id variable for later assignment
|
||||
let mut chunk_id = db.chunk_summaries().unwrap()[0].id;
|
||||
|
||||
|
|
|
@ -565,12 +565,10 @@ impl Database {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes a [`DmlOperation`] to this `Database` this will either:
|
||||
/// Writes a [`DmlOperation`] to this `Database`.
|
||||
///
|
||||
/// - write it to a write buffer
|
||||
/// - write it to a local `Db`
|
||||
///
|
||||
pub async fn route_operation(&self, operation: &DmlOperation) -> Result<(), WriteError> {
|
||||
/// If the database is configured to consume data from a write buffer, this call will fail.
|
||||
pub fn store_operation(&self, operation: &DmlOperation) -> Result<(), WriteError> {
|
||||
let db = {
|
||||
let state = self.shared.state.read();
|
||||
match &**state {
|
||||
|
@ -587,11 +585,9 @@ impl Database {
|
|||
}
|
||||
};
|
||||
|
||||
db.route_operation(operation).await.map_err(|e| {
|
||||
db.store_operation(operation).map_err(|e| {
|
||||
use super::db::Error;
|
||||
match e {
|
||||
// TODO: Pull write buffer producer out of Db
|
||||
Error::WriteBufferWritingError { source } => WriteError::WriteBuffer { source },
|
||||
Error::HardLimitReached {} => WriteError::HardLimitReached {},
|
||||
Error::StoreWriteErrors { errors } => WriteError::StoreWriteErrors { errors },
|
||||
e => e.into(),
|
||||
|
@ -1341,19 +1337,6 @@ impl DatabaseStateRulesLoaded {
|
|||
.await
|
||||
.context(CatalogLoad)?;
|
||||
|
||||
let rules = self.provided_rules.rules();
|
||||
let write_buffer_factory = shared.application.write_buffer_factory();
|
||||
let producer = match rules.write_buffer_connection.as_ref() {
|
||||
Some(connection) if matches!(connection.direction, WriteBufferDirection::Write) => {
|
||||
let producer = write_buffer_factory
|
||||
.new_config_write(shared.config.name.as_str(), connection)
|
||||
.await
|
||||
.context(CreateWriteBuffer)?;
|
||||
Some(producer)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let database_to_commit = DatabaseToCommit {
|
||||
server_id: shared.config.server_id,
|
||||
iox_object_store: Arc::clone(&self.iox_object_store),
|
||||
|
@ -1361,7 +1344,6 @@ impl DatabaseStateRulesLoaded {
|
|||
rules: Arc::clone(self.provided_rules.rules()),
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
write_buffer_producer: producer,
|
||||
metric_registry: Arc::clone(shared.application.metric_registry()),
|
||||
time_provider: Arc::clone(shared.application.time_provider()),
|
||||
};
|
||||
|
|
334
server/src/db.rs
334
server/src/db.rs
|
@ -49,7 +49,7 @@ use schema::selection::Selection;
|
|||
use schema::Schema;
|
||||
use time::{Time, TimeProvider};
|
||||
use trace::ctx::SpanContext;
|
||||
use write_buffer::core::{WriteBufferReading, WriteBufferWriting};
|
||||
use write_buffer::core::WriteBufferReading;
|
||||
|
||||
pub(crate) use crate::db::chunk::DbChunk;
|
||||
pub(crate) use crate::db::lifecycle::ArcDb;
|
||||
|
@ -94,11 +94,6 @@ pub enum Error {
|
|||
#[snafu(display("Error freezing chunk while rolling over partition: {}", source))]
|
||||
FreezingChunk { source: catalog::chunk::Error },
|
||||
|
||||
#[snafu(display("Error sending entry to write buffer"))]
|
||||
WriteBufferWritingError {
|
||||
source: Box<dyn std::error::Error + Sync + Send>,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot write to this database: no mutable buffer configured"))]
|
||||
DatabaseNotWriteable {},
|
||||
|
||||
|
@ -266,10 +261,6 @@ pub struct Db {
|
|||
/// Number of iterations of the worker delete predicate preservation loop for this Db
|
||||
worker_iterations_delete_predicate_preservation: AtomicUsize,
|
||||
|
||||
/// Optional write buffer producer
|
||||
/// TODO: Move onto Database
|
||||
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
|
||||
|
||||
/// Lock that prevents the cleanup job from deleting files that are written but not yet added to the preserved
|
||||
/// catalog.
|
||||
///
|
||||
|
@ -296,10 +287,6 @@ pub(crate) struct DatabaseToCommit {
|
|||
pub(crate) catalog: Catalog,
|
||||
pub(crate) rules: Arc<DatabaseRules>,
|
||||
pub(crate) time_provider: Arc<dyn TimeProvider>,
|
||||
|
||||
/// TODO: Move onto Database
|
||||
pub(crate) write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
|
||||
|
||||
pub(crate) metric_registry: Arc<metric::Registry>,
|
||||
}
|
||||
|
||||
|
@ -334,7 +321,6 @@ impl Db {
|
|||
catalog_access,
|
||||
worker_iterations_cleanup: AtomicUsize::new(0),
|
||||
worker_iterations_delete_predicate_preservation: AtomicUsize::new(0),
|
||||
write_buffer_producer: database_to_commit.write_buffer_producer,
|
||||
cleanup_lock: Default::default(),
|
||||
time_provider: database_to_commit.time_provider,
|
||||
delete_predicates_mailbox: Default::default(),
|
||||
|
@ -972,51 +958,16 @@ impl Db {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Stores the write on this [`Db`] and/or routes it to the write buffer
|
||||
///
|
||||
/// TODO: Remove this method (#2243)
|
||||
pub(crate) async fn route_operation(&self, operation: &DmlOperation) -> Result<()> {
|
||||
/// Stores the write on this [`Db`].
|
||||
pub(crate) fn store_operation(&self, operation: &DmlOperation) -> Result<()> {
|
||||
let immutable = {
|
||||
let rules = self.rules.read();
|
||||
rules.lifecycle_rules.immutable
|
||||
};
|
||||
debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry");
|
||||
|
||||
match (self.write_buffer_producer.as_ref(), immutable) {
|
||||
(Some(write_buffer), true) => {
|
||||
// If only the write buffer is configured, this is passing the data through to
|
||||
// the write buffer, and it's not an error. We ignore the returned metadata; it
|
||||
// will get picked up when data is read from the write buffer.
|
||||
|
||||
// TODO: be smarter than always using sequencer 0
|
||||
let _ = write_buffer
|
||||
.store_operation(0, operation)
|
||||
.await
|
||||
.context(WriteBufferWritingError)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
(Some(write_buffer), false) => {
|
||||
// If using both write buffer and mutable buffer, we want to wait for the write
|
||||
// buffer to return success before adding the entry to the mutable buffer.
|
||||
|
||||
// TODO: be smarter than always using sequencer 0
|
||||
write_buffer
|
||||
.store_operation(0, operation)
|
||||
.await
|
||||
.context(WriteBufferWritingError)?;
|
||||
}
|
||||
(_, true) => {
|
||||
// If not configured to send entries to the write buffer and the database is
|
||||
// immutable, trying to store an entry is an error and we don't need to build a
|
||||
// `SequencedEntry`.
|
||||
return DatabaseNotWriteable {}.fail();
|
||||
}
|
||||
(None, false) => {
|
||||
// If no write buffer is configured, nothing is
|
||||
// sequencing entries so skip doing so here
|
||||
}
|
||||
};
|
||||
debug!(%immutable, "storing entry");
|
||||
if immutable {
|
||||
return DatabaseNotWriteable {}.fail();
|
||||
}
|
||||
|
||||
match operation {
|
||||
DmlOperation::Write(write) => self.store_write(write),
|
||||
|
@ -1275,20 +1226,20 @@ pub mod test_helpers {
|
|||
use super::*;
|
||||
|
||||
/// Try to write lineprotocol data and return all tables that where written.
|
||||
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
|
||||
pub fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
|
||||
let tables = lines_to_batches(lp, 0).unwrap();
|
||||
let mut table_names: Vec<_> = tables.keys().cloned().collect();
|
||||
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
db.route_operation(&write).await?;
|
||||
db.store_operation(&write)?;
|
||||
|
||||
table_names.sort_unstable();
|
||||
Ok(table_names)
|
||||
}
|
||||
|
||||
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
|
||||
pub async fn write_lp(db: &Db, lp: &str) -> Vec<String> {
|
||||
try_write_lp(db, lp).await.unwrap()
|
||||
pub fn write_lp(db: &Db, lp: &str) -> Vec<String> {
|
||||
try_write_lp(db, lp).unwrap()
|
||||
}
|
||||
|
||||
/// Convenience macro to test if an [`db::Error`](crate::db::Error) is a
|
||||
|
@ -1414,7 +1365,7 @@ mod tests {
|
|||
},
|
||||
utils::{make_db, make_db_time, TestDb},
|
||||
};
|
||||
use ::test_helpers::{assert_contains, assert_error};
|
||||
use ::test_helpers::assert_contains;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use bytes::Bytes;
|
||||
|
@ -1447,9 +1398,6 @@ mod tests {
|
|||
};
|
||||
use time::Time;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use write_buffer::mock::{
|
||||
MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
|
||||
};
|
||||
|
||||
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = TestError> = std::result::Result<T, E>;
|
||||
|
@ -1472,98 +1420,20 @@ mod tests {
|
|||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
let res = db.route_operation(&write).await;
|
||||
let res = db.store_operation(&write);
|
||||
assert_contains!(
|
||||
res.unwrap_err().to_string(),
|
||||
"Cannot write to this database: no mutable buffer configured"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_with_write_buffer_no_mutable_buffer() {
|
||||
// Writes should be forwarded to the write buffer and *not* rejected if the write buffer is
|
||||
// configured and the mutable buffer isn't
|
||||
let write_buffer_state =
|
||||
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
|
||||
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let write_buffer = Arc::new(
|
||||
MockBufferForWriting::new(write_buffer_state.clone(), None, time_provider).unwrap(),
|
||||
);
|
||||
let test_db = TestDb::builder()
|
||||
.write_buffer_producer(write_buffer)
|
||||
.lifecycle_rules(LifecycleRules {
|
||||
immutable: true,
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await
|
||||
.db;
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
test_db.route_operation(&write).await.unwrap();
|
||||
|
||||
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_to_write_buffer_and_mutable_buffer() {
|
||||
// Writes should be forwarded to the write buffer *and* the mutable buffer if both are
|
||||
// configured.
|
||||
let write_buffer_state =
|
||||
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
|
||||
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let write_buffer = Arc::new(
|
||||
MockBufferForWriting::new(write_buffer_state.clone(), None, time_provider).unwrap(),
|
||||
);
|
||||
let db = TestDb::builder()
|
||||
.write_buffer_producer(write_buffer)
|
||||
.build()
|
||||
.await
|
||||
.db;
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
db.route_operation(&write).await.unwrap();
|
||||
|
||||
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
|
||||
|
||||
let batches = run_query(db, "select * from cpu").await;
|
||||
|
||||
let expected = vec![
|
||||
"+-----+--------------------------------+",
|
||||
"| bar | time |",
|
||||
"+-----+--------------------------------+",
|
||||
"| 1 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"+-----+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_buffer_errors_propagated() {
|
||||
let write_buffer = Arc::new(MockBufferForWritingThatAlwaysErrors {});
|
||||
|
||||
let db = TestDb::builder()
|
||||
.write_buffer_producer(write_buffer)
|
||||
.build()
|
||||
.await
|
||||
.db;
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
let res = db.route_operation(&write).await;
|
||||
|
||||
assert_error!(res, Error::WriteBufferWritingError { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cant_write_when_reading_from_write_buffer() {
|
||||
// Validate that writes are rejected if this database is reading from the write buffer
|
||||
let db = immutable_db().await;
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
let res = db.route_operation(&write).await;
|
||||
let res = db.store_operation(&write);
|
||||
assert_contains!(
|
||||
res.unwrap_err().to_string(),
|
||||
"Cannot write to this database: no mutable buffer configured"
|
||||
|
@ -1574,7 +1444,7 @@ mod tests {
|
|||
async fn read_write() {
|
||||
// This test also exercises the path without a write buffer.
|
||||
let db = make_db().await.db;
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 10");
|
||||
|
||||
let batches = run_query(db, "select * from cpu").await;
|
||||
|
||||
|
@ -1602,7 +1472,7 @@ mod tests {
|
|||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
|
||||
// This should succeed and start chunks in the MUB
|
||||
db.route_operation(&write).await.unwrap();
|
||||
db.store_operation(&write).unwrap();
|
||||
|
||||
// Line 1 has the same schema and should end up in the MUB.
|
||||
// Line 2 has a different schema than line 1 and should error
|
||||
|
@ -1616,7 +1486,7 @@ mod tests {
|
|||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
|
||||
// This should return an error because there was at least one error in the loop
|
||||
let err = db.route_operation(&write).await.unwrap_err();
|
||||
let err = db.store_operation(&write).unwrap_err();
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
"Storing database write failed with the following error(s), and possibly more:"
|
||||
|
@ -1686,7 +1556,7 @@ mod tests {
|
|||
|
||||
let db = Arc::clone(&test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
let registry = test_db.metric_registry.as_ref();
|
||||
|
||||
|
@ -1703,16 +1573,16 @@ mod tests {
|
|||
|
||||
// write into same chunk again.
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=3 30").await;
|
||||
write_lp(db.as_ref(), "cpu bar=3 30");
|
||||
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=4 40").await;
|
||||
write_lp(db.as_ref(), "cpu bar=4 40");
|
||||
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=5 50").await;
|
||||
write_lp(db.as_ref(), "cpu bar=5 50");
|
||||
|
||||
// verify chunk size updated
|
||||
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 796);
|
||||
|
@ -1797,10 +1667,10 @@ mod tests {
|
|||
let test_db = make_db().await;
|
||||
let db = Arc::clone(&test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=1 100000000000").await;
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=2 180000000000").await;
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=3 650000000000").await;
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=3 650000000010").await;
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=1 100000000000");
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=2 180000000000");
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=3 650000000000");
|
||||
write_lp(db.as_ref(), "write_metrics_test foo=3 650000000010");
|
||||
|
||||
let mut summary = TimestampSummary::default();
|
||||
summary.record(Time::from_timestamp_nanos(100000000000));
|
||||
|
@ -1835,7 +1705,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn write_with_rollover() {
|
||||
let db = make_db().await.db;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
|
||||
|
||||
let mb_chunk = db
|
||||
|
@ -1855,7 +1725,7 @@ mod tests {
|
|||
assert_batches_sorted_eq!(expected, &batches);
|
||||
|
||||
// add new data
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
let expected = vec![
|
||||
"+-----+--------------------------------+",
|
||||
"| bar | time |",
|
||||
|
@ -1892,7 +1762,7 @@ mod tests {
|
|||
"cpu,core=one user=10.0 11",
|
||||
];
|
||||
|
||||
write_lp(db.as_ref(), &lines.join("\n")).await;
|
||||
write_lp(db.as_ref(), &lines.join("\n"));
|
||||
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
|
||||
|
||||
db.rollover_partition("cpu", "1970-01-01T00")
|
||||
|
@ -1919,8 +1789,8 @@ mod tests {
|
|||
let test_db = make_db().await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mb_chunk = db
|
||||
|
@ -1991,7 +1861,7 @@ mod tests {
|
|||
let (db, time) = make_db_time().await;
|
||||
|
||||
let t_write1 = time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
db.rollover_partition("cpu", partition_key)
|
||||
|
@ -2012,7 +1882,7 @@ mod tests {
|
|||
|
||||
// Put new data into the mutable buffer
|
||||
let t_write2 = time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
// now, compact it
|
||||
let compacted_rb_chunk = db
|
||||
|
@ -2065,12 +1935,12 @@ mod tests {
|
|||
let test_db = make_db().await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag2=a bar=3 5").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20");
|
||||
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10");
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20");
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10");
|
||||
write_lp(db.as_ref(), "cpu,tag2=a bar=3 5");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mb_chunk = db
|
||||
|
@ -2161,9 +2031,9 @@ mod tests {
|
|||
let db = test_db.db;
|
||||
|
||||
// Write some line protocols in Mutable buffer of the DB
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
//Now mark the MB chunk close
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
@ -2261,10 +2131,10 @@ mod tests {
|
|||
let db = test_db.db;
|
||||
|
||||
// Write some line protocols in Mutable buffer of the DB
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
// Now mark the MB chunk close
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
@ -2371,7 +2241,7 @@ mod tests {
|
|||
let w0 = time.inc(Duration::from_secs(23));
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 10");
|
||||
|
||||
{
|
||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||
|
@ -2383,7 +2253,7 @@ mod tests {
|
|||
|
||||
let w1 = time.inc(Duration::from_secs(1));
|
||||
|
||||
write_lp(&db, "cpu bar=1 20").await;
|
||||
write_lp(&db, "cpu bar=1 20");
|
||||
{
|
||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||
let partition = partition.read();
|
||||
|
@ -2399,7 +2269,7 @@ mod tests {
|
|||
let t0 = time.inc(Duration::from_secs(2));
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 10");
|
||||
|
||||
{
|
||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||
|
@ -2416,7 +2286,7 @@ mod tests {
|
|||
|
||||
let tables = lines_to_batches("cpu bar=true 10", 0).unwrap();
|
||||
let write = DmlOperation::Write(DmlWrite::new(tables, Default::default()));
|
||||
db.route_operation(&write).await.unwrap_err();
|
||||
db.store_operation(&write).unwrap_err();
|
||||
{
|
||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||
let partition = partition.read();
|
||||
|
@ -2434,9 +2304,9 @@ mod tests {
|
|||
let db = make_db().await.db;
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu bar=1 10").await; // seq 0
|
||||
write_lp(&db, "cpu bar=1 20").await; // seq 1
|
||||
write_lp(&db, "cpu bar=1 30").await; // seq 2
|
||||
write_lp(&db, "cpu bar=1 10"); // seq 0
|
||||
write_lp(&db, "cpu bar=1 20"); // seq 1
|
||||
write_lp(&db, "cpu bar=1 30"); // seq 2
|
||||
|
||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||
let partition = partition.write();
|
||||
|
@ -2452,8 +2322,8 @@ mod tests {
|
|||
let db = Arc::new(make_db().await.db);
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 20").await;
|
||||
write_lp(&db, "cpu bar=1 10");
|
||||
write_lp(&db, "cpu bar=1 20");
|
||||
|
||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||
let partition = partition.write();
|
||||
|
@ -2474,11 +2344,11 @@ mod tests {
|
|||
let w0 = time.inc(Duration::from_secs(95));
|
||||
|
||||
// Given data loaded into two chunks
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 10");
|
||||
|
||||
let w1 = time.inc(Duration::from_secs(2));
|
||||
|
||||
write_lp(&db, "cpu bar=1 20").await;
|
||||
write_lp(&db, "cpu bar=1 20");
|
||||
|
||||
// When the chunk is rolled over
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
@ -2505,8 +2375,8 @@ mod tests {
|
|||
let db = Arc::new(make_db().await.db);
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 20").await;
|
||||
write_lp(&db, "cpu bar=1 10");
|
||||
write_lp(&db, "cpu bar=1 20");
|
||||
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key).len(), 1);
|
||||
assert_eq!(
|
||||
|
@ -2522,10 +2392,10 @@ mod tests {
|
|||
|
||||
// add a new chunk in mutable buffer, and move chunk1 (but
|
||||
// not chunk 0) to read buffer
|
||||
write_lp(&db, "cpu bar=1 30").await;
|
||||
write_lp(&db, "cpu bar=1 30");
|
||||
db.compact_open_chunk("cpu", "1970-01-01T00").await.unwrap();
|
||||
|
||||
write_lp(&db, "cpu bar=1 40").await;
|
||||
write_lp(&db, "cpu bar=1 40");
|
||||
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key).len(), 2);
|
||||
assert_eq!(read_buffer_chunk_ids(&db, partition_key).len(), 1);
|
||||
|
@ -2536,11 +2406,11 @@ mod tests {
|
|||
// Test that chunk id listing is hooked up
|
||||
let db = Arc::new(make_db().await.db);
|
||||
|
||||
write_lp(&db, "cpu bar=1 1").await;
|
||||
write_lp(&db, "cpu bar=1 1");
|
||||
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
|
||||
|
||||
// write into a separate partitiion
|
||||
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000").await;
|
||||
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000");
|
||||
|
||||
print!("Partitions: {:?}", db.partition_keys().unwrap());
|
||||
|
||||
|
@ -2585,10 +2455,10 @@ mod tests {
|
|||
let (db, time) = make_db_time().await;
|
||||
|
||||
let t_first_write = time.inc(Duration::from_secs(2));
|
||||
write_lp(&db, "cpu bar=1 1").await;
|
||||
write_lp(&db, "cpu bar=1 1");
|
||||
|
||||
let t_second_write = time.inc(Duration::from_secs(2));
|
||||
write_lp(&db, "cpu bar=2 2").await;
|
||||
write_lp(&db, "cpu bar=2 2");
|
||||
|
||||
let mut chunk_summaries = db.chunk_summaries().unwrap();
|
||||
|
||||
|
@ -2640,7 +2510,7 @@ mod tests {
|
|||
// In open chunk, will end up in rb/os
|
||||
let t1_write = Time::from_timestamp(11, 22);
|
||||
time.set(t1_write);
|
||||
write_lp(&db, "cpu bar=1 1").await;
|
||||
write_lp(&db, "cpu bar=1 1");
|
||||
|
||||
// Move open chunk to closed
|
||||
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
|
||||
|
@ -2648,7 +2518,7 @@ mod tests {
|
|||
// New open chunk in mb
|
||||
// This point will end up in rb/os
|
||||
let t2_write = time.inc(Duration::from_secs(1));
|
||||
write_lp(&db, "cpu bar=1,baz=2 2").await;
|
||||
write_lp(&db, "cpu bar=1,baz=2 2");
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
|
@ -2665,7 +2535,7 @@ mod tests {
|
|||
|
||||
// This point makes a new open mb chunk and will end up in the closed mb chunk
|
||||
time.inc(Duration::from_secs(1));
|
||||
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000").await;
|
||||
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000");
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
|
@ -2753,7 +2623,7 @@ mod tests {
|
|||
// New open chunk in mb
|
||||
// This point will stay in this open mb chunk
|
||||
let t5_write = time.inc(Duration::from_secs(1));
|
||||
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await;
|
||||
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000");
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
|
@ -2842,13 +2712,13 @@ mod tests {
|
|||
// Test that chunk id listing is hooked up
|
||||
let db = make_db().await.db;
|
||||
|
||||
write_lp(&db, "cpu bar=1 1").await;
|
||||
write_lp(&db, "cpu bar=1 1");
|
||||
db.rollover_partition("cpu", "1970-01-01T00")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
write_lp(&db, "cpu bar=2,baz=3.0 2").await;
|
||||
write_lp(&db, "mem foo=1 1").await;
|
||||
write_lp(&db, "cpu bar=2,baz=3.0 2");
|
||||
write_lp(&db, "mem foo=1 1");
|
||||
|
||||
// load a chunk to the read buffer
|
||||
db.compact_partition("cpu", "1970-01-01T00").await.unwrap();
|
||||
|
@ -2859,8 +2729,8 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// write into a separate partition
|
||||
write_lp(&db, "cpu bar=1 400000000000000").await;
|
||||
write_lp(&db, "mem frob=3 400000000000001").await;
|
||||
write_lp(&db, "cpu bar=1 400000000000000");
|
||||
write_lp(&db, "mem frob=3 400000000000001");
|
||||
|
||||
print!("Partitions: {:?}", db.partition_keys().unwrap());
|
||||
|
||||
|
@ -2974,8 +2844,8 @@ mod tests {
|
|||
let db = make_db().await.db;
|
||||
|
||||
// create MB partition
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
// MB => RB
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
@ -3015,11 +2885,11 @@ mod tests {
|
|||
.db;
|
||||
|
||||
// inserting first line does not trigger hard buffer limit
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
// but second line will
|
||||
assert!(matches!(
|
||||
try_write_lp(db.as_ref(), "cpu bar=2 20").await,
|
||||
try_write_lp(db.as_ref(), "cpu bar=2 20"),
|
||||
Err(super::Error::HardLimitReached {})
|
||||
));
|
||||
}
|
||||
|
@ -3042,7 +2912,7 @@ mod tests {
|
|||
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
let mut reporter = metric::RawReporter::default();
|
||||
test_db.metric_registry.report(&mut reporter);
|
||||
|
@ -3250,7 +3120,7 @@ mod tests {
|
|||
}
|
||||
|
||||
// ==================== check: DB still writable ====================
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -3419,7 +3289,7 @@ mod tests {
|
|||
}
|
||||
|
||||
// ==================== check: DB still writable ====================
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -3499,17 +3369,11 @@ mod tests {
|
|||
let db = test_db.db;
|
||||
|
||||
// first write should create schema
|
||||
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=1 10")
|
||||
.await
|
||||
.unwrap();
|
||||
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=1 10").unwrap();
|
||||
|
||||
// other writes are allowed to evolve the schema
|
||||
try_write_lp(&db, "my_table,tag_partition_by=a field_string=\"foo\" 10")
|
||||
.await
|
||||
.unwrap();
|
||||
try_write_lp(&db, "my_table,tag_partition_by=b field_float=1.1 10")
|
||||
.await
|
||||
.unwrap();
|
||||
try_write_lp(&db, "my_table,tag_partition_by=a field_string=\"foo\" 10").unwrap();
|
||||
try_write_lp(&db, "my_table,tag_partition_by=b field_float=1.1 10").unwrap();
|
||||
|
||||
// check that we have the expected partitions
|
||||
let mut partition_keys = db.partition_keys().unwrap();
|
||||
|
@ -3523,23 +3387,20 @@ mod tests {
|
|||
);
|
||||
|
||||
// illegal changes
|
||||
let e = try_write_lp(&db, "my_table,tag_partition_by=a field_integer=\"foo\" 10")
|
||||
.await
|
||||
.unwrap_err();
|
||||
let e =
|
||||
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=\"foo\" 10").unwrap_err();
|
||||
assert_store_sequenced_entry_failures!(
|
||||
e,
|
||||
[super::Error::TableBatchSchemaMergeError { .. }]
|
||||
);
|
||||
let e = try_write_lp(&db, "my_table,tag_partition_by=b field_integer=\"foo\" 10")
|
||||
.await
|
||||
.unwrap_err();
|
||||
let e =
|
||||
try_write_lp(&db, "my_table,tag_partition_by=b field_integer=\"foo\" 10").unwrap_err();
|
||||
assert_store_sequenced_entry_failures!(
|
||||
e,
|
||||
[super::Error::TableBatchSchemaMergeError { .. }]
|
||||
);
|
||||
let e = try_write_lp(&db, "my_table,tag_partition_by=c field_integer=\"foo\" 10")
|
||||
.await
|
||||
.unwrap_err();
|
||||
let e =
|
||||
try_write_lp(&db, "my_table,tag_partition_by=c field_integer=\"foo\" 10").unwrap_err();
|
||||
assert_store_sequenced_entry_failures!(
|
||||
e,
|
||||
[super::Error::TableBatchSchemaMergeError { .. }]
|
||||
|
@ -3568,9 +3429,8 @@ mod tests {
|
|||
}
|
||||
|
||||
// schema is still there
|
||||
let e = try_write_lp(&db, "my_table,tag_partition_by=a field_integer=\"foo\" 10")
|
||||
.await
|
||||
.unwrap_err();
|
||||
let e =
|
||||
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=\"foo\" 10").unwrap_err();
|
||||
assert_store_sequenced_entry_failures!(
|
||||
e,
|
||||
[super::Error::TableBatchSchemaMergeError { .. }]
|
||||
|
@ -3591,7 +3451,7 @@ mod tests {
|
|||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let chunks = db.partition_chunk_summaries(partition_key);
|
||||
|
@ -3623,8 +3483,8 @@ mod tests {
|
|||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
||||
|
@ -3665,8 +3525,8 @@ mod tests {
|
|||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
db.persist_partition("cpu", partition_key, true)
|
||||
|
@ -3709,7 +3569,7 @@ mod tests {
|
|||
.await
|
||||
.db;
|
||||
|
||||
write_lp(db.as_ref(), "cpu foo=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu foo=1 10");
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
|
@ -3729,7 +3589,7 @@ mod tests {
|
|||
|
||||
let t2 = time.inc(Duration::from_secs(1));
|
||||
|
||||
write_lp(db.as_ref(), "cpu foo=1 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=1 20");
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
|
@ -3750,7 +3610,7 @@ mod tests {
|
|||
}
|
||||
|
||||
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, ChunkId) {
|
||||
write_lp(db, "cpu bar=1 10").await;
|
||||
write_lp(db, "cpu bar=1 10");
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "cpu";
|
||||
|
||||
|
|
|
@ -635,7 +635,7 @@ mod tests {
|
|||
async fn mub_records_access() {
|
||||
let (db, time) = make_db_time().await;
|
||||
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1").await;
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1");
|
||||
|
||||
let chunks = db.catalog.chunks();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
|
@ -650,7 +650,7 @@ mod tests {
|
|||
async fn rub_records_access() {
|
||||
let (db, time) = make_db_time().await;
|
||||
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1").await;
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1");
|
||||
db.compact_partition("cpu", "1970-01-01T00").await.unwrap();
|
||||
|
||||
let chunks = db.catalog.chunks();
|
||||
|
@ -667,7 +667,7 @@ mod tests {
|
|||
let (db, time) = make_db_time().await;
|
||||
|
||||
let t0 = time.inc(Duration::from_secs(324));
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1").await;
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1");
|
||||
|
||||
let id = db
|
||||
.persist_partition("cpu", "1970-01-01T00", true)
|
||||
|
@ -697,10 +697,10 @@ mod tests {
|
|||
let (db, time) = make_db_time().await;
|
||||
|
||||
let w0 = time.inc(Duration::from_secs(10));
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1").await;
|
||||
write_lp(&db, "cpu,tag=1 bar=1 1");
|
||||
|
||||
let w1 = time.inc(Duration::from_secs(10));
|
||||
write_lp(&db, "cpu,tag=2 bar=2 2").await;
|
||||
write_lp(&db, "cpu,tag=2 bar=2 2");
|
||||
|
||||
db.persist_partition("cpu", "1970-01-01T00", true)
|
||||
.await
|
||||
|
|
|
@ -180,13 +180,13 @@ mod tests {
|
|||
let (db, time) = make_db_time().await;
|
||||
|
||||
let t_first_write = time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20");
|
||||
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10");
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20");
|
||||
|
||||
let t_last_write = time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10");
|
||||
|
||||
let partition_keys = db.partition_keys().unwrap();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
|
@ -201,7 +201,7 @@ mod tests {
|
|||
let (_, fut) = compact_chunks(partition_guard.upgrade(), vec![chunk.upgrade()]).unwrap();
|
||||
// NB: perform the write before spawning the background task that performs the compaction
|
||||
let t_later_write = time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 40").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 40");
|
||||
tokio::spawn(fut).await.unwrap().unwrap().unwrap();
|
||||
|
||||
let mut chunk_summaries: Vec<_> = partition.read().chunk_summaries().collect();
|
||||
|
@ -240,9 +240,9 @@ mod tests {
|
|||
async fn test_compact_delete_all() {
|
||||
let db = make_db().await.db;
|
||||
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23");
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26");
|
||||
|
||||
let partition_keys = db.partition_keys().unwrap();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
|
@ -277,10 +277,10 @@ mod tests {
|
|||
// | 2 | yes | yes |
|
||||
// | 3 | no | yes |
|
||||
// | 4 | no | no |
|
||||
write_lp(db.as_ref(), "cpu foo=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu foo=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=3 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=4 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=1 10");
|
||||
write_lp(db.as_ref(), "cpu foo=2 20");
|
||||
write_lp(db.as_ref(), "cpu foo=3 20");
|
||||
write_lp(db.as_ref(), "cpu foo=4 20");
|
||||
|
||||
let range = TimestampRange {
|
||||
start: 0,
|
||||
|
|
|
@ -390,7 +390,7 @@ mod tests {
|
|||
|
||||
let db = make_db().await.db;
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
|
||||
|
||||
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
||||
let partition = partition.write();
|
||||
|
@ -415,7 +415,7 @@ mod tests {
|
|||
|
||||
let db = make_db().await.db;
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
|
||||
|
||||
// persisted non persisted chunks
|
||||
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
||||
|
@ -446,7 +446,7 @@ mod tests {
|
|||
|
||||
let db = make_db().await.db;
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
|
||||
|
||||
// persist chunk 1
|
||||
db.persist_partition("cpu", partition_key, true)
|
||||
|
@ -456,7 +456,7 @@ mod tests {
|
|||
.id();
|
||||
//
|
||||
// persist chunk 2
|
||||
write_lp(db.as_ref(), "cpu,tag1=chunk2,tag2=a bar=2 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=chunk2,tag2=a bar=2 10");
|
||||
db.persist_partition("cpu", partition_key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -464,7 +464,7 @@ mod tests {
|
|||
.id();
|
||||
//
|
||||
// persist chunk 3
|
||||
write_lp(db.as_ref(), "cpu,tag1=chunk3,tag2=a bar=2 30").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=chunk3,tag2=a bar=2 30");
|
||||
db.persist_partition("cpu", partition_key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -472,7 +472,7 @@ mod tests {
|
|||
.id();
|
||||
//
|
||||
// Add a MUB
|
||||
write_lp(db.as_ref(), "cpu,tag1=chunk4,tag2=a bar=2 40").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=chunk4,tag2=a bar=2 40");
|
||||
|
||||
// let compact 2 non contiguous chunk 1 and chunk 3
|
||||
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
||||
|
|
|
@ -267,7 +267,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_flush_overlapping() {
|
||||
let (db, time) = test_db().await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
|
||||
|
||||
let partition_keys = db.partition_keys().unwrap();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
|
@ -275,7 +275,7 @@ mod tests {
|
|||
// Close window
|
||||
time.inc(Duration::from_secs(2));
|
||||
|
||||
write_lp(db.as_ref(), "cpu,tag1=lagged bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=lagged bar=1 10");
|
||||
|
||||
let partition = db.lockable_partition("cpu", &partition_keys[0]).unwrap();
|
||||
let partition_guard = partition.read();
|
||||
|
@ -314,10 +314,10 @@ mod tests {
|
|||
let late_arrival = Duration::from_secs(1);
|
||||
|
||||
time.inc(Duration::from_secs(32));
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
|
||||
|
||||
time.inc(late_arrival);
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23");
|
||||
|
||||
let partition_keys = db.partition_keys().unwrap();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
|
@ -357,8 +357,8 @@ mod tests {
|
|||
|
||||
// Add a second set of writes one of which overlaps the above chunk
|
||||
time.inc(late_arrival * 10);
|
||||
write_lp(db.as_ref(), "cpu,tag1=foo bar=2 23").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=foo bar=2 23");
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26");
|
||||
|
||||
// Persist second write but not third
|
||||
let maybe_chunk = db
|
||||
|
@ -422,7 +422,7 @@ mod tests {
|
|||
let (db, time) = test_db().await;
|
||||
|
||||
let late_arrival = Duration::from_secs(1);
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
|
||||
|
||||
let partition_keys = db.partition_keys().unwrap();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
|
@ -491,10 +491,10 @@ mod tests {
|
|||
// | 2 | yes | yes |
|
||||
// | 3 | no | yes |
|
||||
// | 4 | no | no |
|
||||
write_lp(db.as_ref(), "cpu foo=1 10").await;
|
||||
write_lp(db.as_ref(), "cpu foo=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=3 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=4 20").await;
|
||||
write_lp(db.as_ref(), "cpu foo=1 10");
|
||||
write_lp(db.as_ref(), "cpu foo=2 20");
|
||||
write_lp(db.as_ref(), "cpu foo=3 20");
|
||||
write_lp(db.as_ref(), "cpu foo=4 20");
|
||||
|
||||
let range = TimestampRange {
|
||||
start: 0,
|
||||
|
|
|
@ -19,7 +19,6 @@ use std::{
|
|||
};
|
||||
use time::{Time, TimeProvider};
|
||||
use uuid::Uuid;
|
||||
use write_buffer::core::WriteBufferWriting;
|
||||
|
||||
// A wrapper around a Db and a metric registry allowing for isolated testing
|
||||
// of a Db and its metrics.
|
||||
|
@ -43,7 +42,6 @@ pub struct TestDbBuilder {
|
|||
db_name: DatabaseName<'static>,
|
||||
uuid: Uuid,
|
||||
worker_cleanup_avg_sleep: Duration,
|
||||
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
|
||||
lifecycle_rules: LifecycleRules,
|
||||
partition_template: PartitionTemplate,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
|
@ -58,7 +56,6 @@ impl Default for TestDbBuilder {
|
|||
uuid: Uuid::new_v4(),
|
||||
// make background loop spin a bit faster for tests
|
||||
worker_cleanup_avg_sleep: Duration::from_secs(1),
|
||||
write_buffer_producer: None,
|
||||
// default to quick lifecycle rules for faster tests
|
||||
lifecycle_rules: LifecycleRules {
|
||||
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
||||
|
@ -130,7 +127,6 @@ impl TestDbBuilder {
|
|||
iox_object_store,
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
write_buffer_producer: self.write_buffer_producer.clone(),
|
||||
exec,
|
||||
metric_registry: Arc::clone(&metric_registry),
|
||||
time_provider,
|
||||
|
@ -163,14 +159,6 @@ impl TestDbBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn write_buffer_producer(
|
||||
mut self,
|
||||
write_buffer_producer: Arc<dyn WriteBufferWriting>,
|
||||
) -> Self {
|
||||
self.write_buffer_producer = Some(write_buffer_producer);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn lifecycle_rules(mut self, lifecycle_rules: LifecycleRules) -> Self {
|
||||
self.lifecycle_rules = lifecycle_rules;
|
||||
self
|
||||
|
|
|
@ -64,29 +64,29 @@ async fn delete_predicate_preservation() {
|
|||
|
||||
// 1: preserved
|
||||
let partition_key = "part_a";
|
||||
write_lp(&db, "cpu,part=a row=10,selector=0i 10").await;
|
||||
write_lp(&db, "cpu,part=a row=11,selector=1i 11").await;
|
||||
write_lp(&db, "cpu,part=a row=10,selector=0i 10");
|
||||
write_lp(&db, "cpu,part=a row=11,selector=1i 11");
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 2: RUB
|
||||
let partition_key = "part_b";
|
||||
write_lp(&db, "cpu,part=b row=20,selector=0i 20").await;
|
||||
write_lp(&db, "cpu,part=b row=21,selector=1i 21").await;
|
||||
write_lp(&db, "cpu,part=b row=20,selector=0i 20");
|
||||
write_lp(&db, "cpu,part=b row=21,selector=1i 21");
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 3: MUB
|
||||
let _partition_key = "part_c";
|
||||
write_lp(&db, "cpu,part=c row=30,selector=0i 30").await;
|
||||
write_lp(&db, "cpu,part=c row=31,selector=1i 31").await;
|
||||
write_lp(&db, "cpu,part=c row=30,selector=0i 30");
|
||||
write_lp(&db, "cpu,part=c row=31,selector=1i 31");
|
||||
|
||||
// 4: preserved and unloaded
|
||||
let partition_key = "part_d";
|
||||
write_lp(&db, "cpu,part=d row=40,selector=0i 40").await;
|
||||
write_lp(&db, "cpu,part=d row=41,selector=1i 41").await;
|
||||
write_lp(&db, "cpu,part=d row=40,selector=0i 40");
|
||||
write_lp(&db, "cpu,part=d row=41,selector=1i 41");
|
||||
|
||||
let chunk_id = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
|
|
|
@ -88,7 +88,7 @@ async fn setup(
|
|||
let mut chunk_ids = vec![];
|
||||
|
||||
for _ in 0..N_CHUNKS {
|
||||
let table_names = write_lp(&db, &lp).await;
|
||||
let table_names = write_lp(&db, &lp);
|
||||
|
||||
for table_name in &table_names {
|
||||
db.compact_open_chunk(table_name, partition_key)
|
||||
|
|
Loading…
Reference in New Issue