diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index e37eb498d9..981aecb77b 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -163,7 +163,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { let db = make_db().await.db; let data = lp_lines.join("\n"); - write_lp(&db, &data); + write_lp(&db, &data).await; let scenario1 = DbScenario { scenario_name: "Data in 4 partitions, open chunks of mutable buffer".into(), db, @@ -171,7 +171,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { let db = make_db().await.db; let data = lp_lines.join("\n"); - write_lp(&db, &data); + write_lp(&db, &data).await; db.rollover_partition("h2o", "2020-03-01T00").await.unwrap(); db.rollover_partition("h2o", "2020-03-02T00").await.unwrap(); let scenario2 = DbScenario { @@ -183,7 +183,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { let db = make_db().await.db; let data = lp_lines.join("\n"); - write_lp(&db, &data); + write_lp(&db, &data).await; // roll over and load chunks into both RUB and OS rollover_and_load(&db, "2020-03-01T00", "h2o").await; rollover_and_load(&db, "2020-03-02T00", "h2o").await; diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index 9092dc7287..2e1e4f4ca6 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -19,8 +19,8 @@ async fn setup() -> TestDb { let db = &test_db.db; // Chunk 0 has bar:[1-2] - write_lp(db, "cpu bar=1 10"); - write_lp(db, "cpu bar=2 20"); + write_lp(db, "cpu bar=1 10").await; + write_lp(db, "cpu bar=2 20").await; let partition_key = "1970-01-01T00"; let mb_chunk = db @@ -33,9 +33,9 @@ async fn setup() -> TestDb { .unwrap(); // Chunk 1 has bar:[3-3] (going to get pruned) - write_lp(db, "cpu bar=3 10"); - write_lp(db, "cpu bar=3 100"); - write_lp(db, "cpu bar=3 1000"); + write_lp(db, "cpu bar=3 10").await; + write_lp(db, "cpu bar=3 100").await; + write_lp(db, "cpu bar=3 1000").await; let partition_key = "1970-01-01T00"; let mb_chunk = db diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 6241042436..8a7dfddf68 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -57,7 +57,7 @@ impl DbSetup for NoData { // let db = make_db().await.db; let data = "cpu,region=west user=23.2 100"; - write_lp(&db, data); + write_lp(&db, data).await; // move data out of open chunk assert_eq!( db.rollover_partition(table_name, partition_key) @@ -95,7 +95,7 @@ impl DbSetup for NoData { // let db = make_db().await.db; let data = "cpu,region=west user=23.2 100"; - write_lp(&db, data); + write_lp(&db, data).await; // move data out of open chunk assert_eq!( db.rollover_partition(table_name, partition_key) @@ -285,7 +285,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk { "o2,state=CA temp=79.0 300", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; vec![DbScenario { scenario_name: "Data in open chunk of mutable buffer".into(), db, @@ -307,7 +307,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks { "h2o,state=MA,city=Boston temp=70.4 50", "h2o,state=MA,city=Boston other_temp=70.4 250", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 0) .await @@ -318,7 +318,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks { "o2,state=MA,city=Boston temp=53.4,reading=51 50", "o2,state=CA temp=79.0 300", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; vec![DbScenario { scenario_name: "Data in open chunk of mutable buffer and read buffer".into(), @@ -342,7 +342,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet { "h2o,state=MA temp=70.4 50", "h2o,state=MA other_temp=70.4 250", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 0) .await @@ -353,7 +353,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet { "h2o,city=Boston other_temp=72.4 350", "h2o,city=Boston temp=53.4,reading=51 50", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 1) .await @@ -385,7 +385,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { "h2o,state=MA,city=Boston max_temp=75.4 250", "h2o,state=MA,city=Andover max_temp=69.2, 250", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 0) .await @@ -402,7 +402,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { "h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300", "h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 1) .await @@ -419,7 +419,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { "h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450", "h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 2) .await @@ -436,7 +436,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { "h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650", "h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700", ]; - write_lp(&db, &lp_lines.join("\n")); + write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); db.load_chunk_to_read_buffer("h2o", partition_key, 3) .await @@ -467,7 +467,8 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle { "h2o,state=MA,city=Boston other_temp=70.4 250", ] .join("\n"), - ); + ) + .await; // Use a background task to do the work note when I used // TaskTracker::join, it ended up hanging for reasons I don't @@ -479,7 +480,8 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle { write_lp( &db, &vec!["h2o,state=CA,city=Boston other_temp=72.4 350"].join("\n"), - ); + ) + .await; db.write_chunk_to_object_store("h2o", partition_key, 0) .await @@ -536,7 +538,7 @@ impl DbSetup for EndToEndTest { let lp_data = lp_lines.join("\n"); let db = make_db().await.db; - write_lp(&db, &lp_data); + write_lp(&db, &lp_data).await; let scenario1 = DbScenario { scenario_name: "Data in open chunk of mutable buffer".into(), @@ -552,7 +554,7 @@ impl DbSetup for EndToEndTest { pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec { // Scenario 1: One open chunk in MUB let db = make_db().await.db; - write_lp(&db, data); + write_lp(&db, data).await; let scenario1 = DbScenario { scenario_name: "Data in open chunk of mutable buffer".into(), db, @@ -560,7 +562,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> // Scenario 2: One closed chunk in MUB let db = make_db().await.db; - let table_names = write_lp(&db, data); + let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -573,7 +575,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> // Scenario 3: One closed chunk in RUB let db = make_db().await.db; - let table_names = write_lp(&db, data); + let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -589,7 +591,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> // Scenario 4: One closed chunk in both RUb and OS let db = make_db().await.db; - let table_names = write_lp(&db, data); + let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -609,7 +611,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> // Scenario 5: One closed chunk in OS only let db = make_db().await.db; - let table_names = write_lp(&db, data); + let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -643,8 +645,8 @@ pub async fn make_two_chunk_scenarios( data2: &str, ) -> Vec { let db = make_db().await.db; - write_lp(&db, data1); - write_lp(&db, data2); + write_lp(&db, data1).await; + write_lp(&db, data2).await; let scenario1 = DbScenario { scenario_name: "Data in single open chunk of mutable buffer".into(), db, @@ -652,13 +654,13 @@ pub async fn make_two_chunk_scenarios( // spread across 2 mutable buffer chunks let db = make_db().await.db; - let table_names = write_lp(&db, data1); + let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await .unwrap(); } - write_lp(&db, data2); + write_lp(&db, data2).await; let scenario2 = DbScenario { scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(), db, @@ -666,7 +668,7 @@ pub async fn make_two_chunk_scenarios( // spread across 1 mutable buffer, 1 read buffer chunks let db = make_db().await.db; - let table_names = write_lp(&db, data1); + let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -675,7 +677,7 @@ pub async fn make_two_chunk_scenarios( .await .unwrap(); } - write_lp(&db, data2); + write_lp(&db, data2).await; let scenario3 = DbScenario { scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(), db, @@ -683,13 +685,13 @@ pub async fn make_two_chunk_scenarios( // in 2 read buffer chunks let db = make_db().await.db; - let table_names = write_lp(&db, data1); + let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await .unwrap(); } - let table_names = write_lp(&db, data2); + let table_names = write_lp(&db, data2).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -710,13 +712,13 @@ pub async fn make_two_chunk_scenarios( // in 2 read buffer chunks that also loaded into object store let db = make_db().await.db; - let table_names = write_lp(&db, data1); + let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await .unwrap(); } - let table_names = write_lp(&db, data2); + let table_names = write_lp(&db, data2).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -745,13 +747,13 @@ pub async fn make_two_chunk_scenarios( // Scenario 6: Two closed chunk in OS only let db = make_db().await.db; - let table_names = write_lp(&db, data1); + let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await .unwrap(); } - let table_names = write_lp(&db, data2); + let table_names = write_lp(&db, data2).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -807,7 +809,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( ) -> Vec { // Scenario 1: One closed chunk in RUB let db = make_db().await.db; - let table_names = write_lp(&db, data); + let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await @@ -823,7 +825,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( // Scenario 2: One closed chunk in Parquet only let db = make_db().await.db; - let table_names = write_lp(&db, data); + let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) .await diff --git a/server/src/db.rs b/server/src/db.rs index 12a9b7771a..8a985217d2 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -862,7 +862,7 @@ impl Db { } /// Stores an entry based on the configuration. - pub fn store_entry(&self, entry: Entry) -> Result<()> { + pub async fn store_entry(&self, entry: Entry) -> Result<()> { let immutable = { let rules = self.rules.read(); rules.lifecycle_rules.immutable @@ -873,13 +873,19 @@ impl Db { // If only the write buffer is configured, this is passing the data through to // the write buffer, and it's not an error. We ignore the returned metadata; it // will get picked up when data is read from the write buffer. - let _ = write_buffer.store_entry(&entry).context(WriteBufferError)?; + let _ = write_buffer + .store_entry(&entry) + .await + .context(WriteBufferError)?; Ok(()) } (Some(write_buffer), false) => { // If using both write buffer and mutable buffer, we want to wait for the write // buffer to return success before adding the entry to the mutable buffer. - let sequence = write_buffer.store_entry(&entry).context(WriteBufferError)?; + let sequence = write_buffer + .store_entry(&entry) + .await + .context(WriteBufferError)?; let sequenced_entry = Arc::new( SequencedEntry::new_from_sequence(sequence, entry) .context(SequencedEntryError)?, @@ -1169,32 +1175,29 @@ pub mod test_helpers { use std::collections::HashSet; /// Try to write lineprotocol data and return all tables that where written. - pub fn try_write_lp(db: &Db, lp: &str) -> Result> { + pub async fn try_write_lp(db: &Db, lp: &str) -> Result> { let entries = lp_to_entries(lp); let mut tables = HashSet::new(); - for entry in &entries { + for entry in entries { if let Some(writes) = entry.partition_writes() { for write in writes { for batch in write.table_batches() { tables.insert(batch.name().to_string()); } } + db.store_entry(entry).await?; } } - entries - .into_iter() - .try_for_each(|entry| db.store_entry(entry))?; - let mut tables: Vec<_> = tables.into_iter().collect(); tables.sort(); Ok(tables) } /// Same was [`try_write_lp`](try_write_lp) but will panic on failure. - pub fn write_lp(db: &Db, lp: &str) -> Vec { - try_write_lp(db, lp).unwrap() + pub async fn write_lp(db: &Db, lp: &str) -> Vec { + try_write_lp(db, lp).await.unwrap() } } @@ -1251,7 +1254,7 @@ mod tests { let db = make_db().await.db; db.rules.write().lifecycle_rules.immutable = true; let entry = lp_to_entry("cpu bar=1 10"); - let res = db.store_entry(entry); + let res = db.store_entry(entry).await; assert_contains!( res.unwrap_err().to_string(), "Cannot write to this database: no mutable buffer configured" @@ -1272,7 +1275,7 @@ mod tests { test_db.rules.write().lifecycle_rules.immutable = true; let entry = lp_to_entry("cpu bar=1 10"); - test_db.store_entry(entry).unwrap(); + test_db.store_entry(entry).await.unwrap(); assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); } @@ -1289,7 +1292,7 @@ mod tests { .db; let entry = lp_to_entry("cpu bar=1 10"); - test_db.store_entry(entry).unwrap(); + test_db.store_entry(entry).await.unwrap(); assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); @@ -1310,7 +1313,7 @@ mod tests { async fn read_write() { // This test also exercises the path without a write buffer. let db = Arc::new(make_db().await.db); - write_lp(&db, "cpu bar=1 10"); + write_lp(&db, "cpu bar=1 10").await; let batches = run_query(db, "select * from cpu").await; @@ -1344,7 +1347,7 @@ mod tests { let test_db = make_db().await; let db = Arc::new(test_db.db); - write_lp(db.as_ref(), "cpu bar=1 10"); + write_lp(db.as_ref(), "cpu bar=1 10").await; // A chunk has been opened test_db @@ -1363,7 +1366,7 @@ mod tests { catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap(); // write into same chunk again. - write_lp(db.as_ref(), "cpu bar=2 10"); + write_lp(db.as_ref(), "cpu bar=2 10").await; // verify chunk size updated catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap(); @@ -1472,7 +1475,7 @@ mod tests { #[tokio::test] async fn write_with_rollover() { let db = Arc::new(make_db().await.db); - write_lp(db.as_ref(), "cpu bar=1 10"); + write_lp(db.as_ref(), "cpu bar=1 10").await; assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap()); let mb_chunk = db @@ -1493,7 +1496,7 @@ mod tests { assert_batches_sorted_eq!(expected, &batches); // add new data - write_lp(db.as_ref(), "cpu bar=2 20"); + write_lp(db.as_ref(), "cpu bar=2 20").await; let expected = vec![ "+-----+-------------------------------+", "| bar | time |", @@ -1530,7 +1533,7 @@ mod tests { "cpu,core=one user=10.0 11", ]; - write_lp(db.as_ref(), &lines.join("\n")); + write_lp(db.as_ref(), &lines.join("\n")).await; assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap()); let mb_chunk = db @@ -1559,8 +1562,8 @@ mod tests { let test_db = make_db().await; let db = Arc::new(test_db.db); - write_lp(db.as_ref(), "cpu bar=1 10"); - write_lp(db.as_ref(), "cpu bar=2 20"); + write_lp(db.as_ref(), "cpu bar=1 10").await; + write_lp(db.as_ref(), "cpu bar=2 20").await; let partition_key = "1970-01-01T00"; let mb_chunk = db @@ -1645,12 +1648,12 @@ mod tests { let test_db = make_db().await; let db = Arc::new(test_db.db); - write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10"); - write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20"); - write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10"); - write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20"); - write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10"); - write_lp(db.as_ref(), "cpu,tag2=a bar=3 5"); + write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await; + write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20").await; + write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10").await; + write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20").await; + write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10").await; + write_lp(db.as_ref(), "cpu,tag2=a bar=3 5").await; let partition_key = "1970-01-01T00"; let mb_chunk = db @@ -1760,8 +1763,8 @@ mod tests { let db = Arc::new(test_db.db); // Write some line protocols in Mutable buffer of the DB - write_lp(db.as_ref(), "cpu bar=1 10"); - write_lp(db.as_ref(), "cpu bar=2 20"); + write_lp(db.as_ref(), "cpu bar=1 10").await; + write_lp(db.as_ref(), "cpu bar=2 20").await; //Now mark the MB chunk close let partition_key = "1970-01-01T00"; @@ -1859,8 +1862,8 @@ mod tests { let db = Arc::new(test_db.db); // Write some line protocols in Mutable buffer of the DB - write_lp(db.as_ref(), "cpu bar=1 10"); - write_lp(db.as_ref(), "cpu bar=2 20"); + write_lp(db.as_ref(), "cpu bar=1 10").await; + write_lp(db.as_ref(), "cpu bar=2 20").await; // Now mark the MB chunk close let partition_key = "1970-01-01T00"; @@ -1970,7 +1973,7 @@ mod tests { let before_create = Utc::now(); let partition_key = "1970-01-01T00"; - write_lp(&db, "cpu bar=1 10"); + write_lp(&db, "cpu bar=1 10").await; let after_write = Utc::now(); let last_write_prev = { @@ -1983,7 +1986,7 @@ mod tests { partition.last_write_at() }; - write_lp(&db, "cpu bar=1 20"); + write_lp(&db, "cpu bar=1 20").await; { let partition = db.catalog.partition("cpu", partition_key).unwrap(); let partition = partition.read(); @@ -1997,7 +2000,7 @@ mod tests { let db = Arc::new(make_db().await.db); // Given data loaded into two chunks - write_lp(&db, "cpu bar=1 10"); + write_lp(&db, "cpu bar=1 10").await; let after_data_load = Utc::now(); // When the chunk is rolled over @@ -2035,8 +2038,8 @@ mod tests { db.rules.write().lifecycle_rules.mutable_size_threshold = Some(NonZeroUsize::new(2).unwrap()); - write_lp(&db, "cpu bar=1 10"); - write_lp(&db, "cpu bar=1 20"); + write_lp(&db, "cpu bar=1 10").await; + write_lp(&db, "cpu bar=1 20").await; let partitions = db.catalog.partition_keys(); assert_eq!(partitions.len(), 1); @@ -2066,10 +2069,10 @@ mod tests { #[tokio::test] async fn chunks_sorted_by_times() { let db = Arc::new(make_db().await.db); - write_lp(&db, "cpu val=1 1"); - write_lp(&db, "mem val=2 400000000000001"); - write_lp(&db, "cpu val=1 2"); - write_lp(&db, "mem val=2 400000000000002"); + write_lp(&db, "cpu val=1 1").await; + write_lp(&db, "mem val=2 400000000000001").await; + write_lp(&db, "cpu val=1 2").await; + write_lp(&db, "mem val=2 400000000000002").await; let sort_rules = SortOrder { order: Order::Desc, @@ -2101,8 +2104,8 @@ mod tests { let db = Arc::new(make_db().await.db); let partition_key = "1970-01-01T00"; - write_lp(&db, "cpu bar=1 10"); - write_lp(&db, "cpu bar=1 20"); + write_lp(&db, "cpu bar=1 10").await; + write_lp(&db, "cpu bar=1 20").await; assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0]); assert_eq!( @@ -2120,7 +2123,7 @@ mod tests { // add a new chunk in mutable buffer, and move chunk1 (but // not chunk 0) to read buffer - write_lp(&db, "cpu bar=1 30"); + write_lp(&db, "cpu bar=1 30").await; let mb_chunk = db .rollover_partition("cpu", "1970-01-01T00") .await @@ -2130,7 +2133,7 @@ mod tests { .await .unwrap(); - write_lp(&db, "cpu bar=1 40"); + write_lp(&db, "cpu bar=1 40").await; assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 2]); assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]); @@ -2169,11 +2172,11 @@ mod tests { // Test that chunk id listing is hooked up let db = Arc::new(make_db().await.db); - write_lp(&db, "cpu bar=1 1"); + write_lp(&db, "cpu bar=1 1").await; db.rollover_partition("cpu", "1970-01-01T00").await.unwrap(); // write into a separate partitiion - write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000"); + write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000").await; print!("Partitions: {:?}", db.partition_keys().unwrap()); @@ -2212,9 +2215,9 @@ mod tests { async fn partition_chunk_summaries_timestamp() { let db = Arc::new(make_db().await.db); let start = Utc::now(); - write_lp(&db, "cpu bar=1 1"); + write_lp(&db, "cpu bar=1 1").await; let after_first_write = Utc::now(); - write_lp(&db, "cpu bar=2 2"); + write_lp(&db, "cpu bar=2 2").await; db.rollover_partition("cpu", "1970-01-01T00").await.unwrap(); let after_close = Utc::now(); @@ -2264,11 +2267,11 @@ mod tests { let db = Arc::new(make_db().await.db); // get three chunks: one open, one closed in mb and one close in rb - write_lp(&db, "cpu bar=1 1"); + write_lp(&db, "cpu bar=1 1").await; db.rollover_partition("cpu", "1970-01-01T00").await.unwrap(); - write_lp(&db, "cpu bar=1,baz=2 2"); - write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000"); + write_lp(&db, "cpu bar=1,baz=2 2").await; + write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000").await; print!("Partitions: {:?}", db.partition_keys().unwrap()); @@ -2283,7 +2286,7 @@ mod tests { print!("Partitions2: {:?}", db.partition_keys().unwrap()); db.rollover_partition("cpu", "1970-01-05T15").await.unwrap(); - write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000"); + write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await; let chunk_summaries = db.chunk_summaries().expect("expected summary to return"); let chunk_summaries = normalize_summaries(chunk_summaries); @@ -2345,15 +2348,15 @@ mod tests { // Test that chunk id listing is hooked up let db = Arc::new(make_db().await.db); - write_lp(&db, "cpu bar=1 1"); + write_lp(&db, "cpu bar=1 1").await; let chunk_id = db .rollover_partition("cpu", "1970-01-01T00") .await .unwrap() .unwrap() .id(); - write_lp(&db, "cpu bar=2,baz=3.0 2"); - write_lp(&db, "mem foo=1 1"); + write_lp(&db, "cpu bar=2,baz=3.0 2").await; + write_lp(&db, "mem foo=1 1").await; // load a chunk to the read buffer db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id) @@ -2366,8 +2369,8 @@ mod tests { .unwrap(); // write into a separate partition - write_lp(&db, "cpu bar=1 400000000000000"); - write_lp(&db, "mem frob=3 400000000000001"); + write_lp(&db, "cpu bar=1 400000000000000").await; + write_lp(&db, "mem frob=3 400000000000001").await; print!("Partitions: {:?}", db.partition_keys().unwrap()); @@ -2532,8 +2535,8 @@ mod tests { let db = Arc::new(make_db().await.db); // create MB partition - write_lp(db.as_ref(), "cpu bar=1 10"); - write_lp(db.as_ref(), "cpu bar=2 20"); + write_lp(db.as_ref(), "cpu bar=1 10").await; + write_lp(db.as_ref(), "cpu bar=2 20").await; // MB => RB let partition_key = "1970-01-01T00"; @@ -2566,11 +2569,11 @@ mod tests { db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap()); // inserting first line does not trigger hard buffer limit - write_lp(db.as_ref(), "cpu bar=1 10"); + write_lp(db.as_ref(), "cpu bar=1 10").await; // but second line will assert!(matches!( - try_write_lp(db.as_ref(), "cpu bar=2 20"), + try_write_lp(db.as_ref(), "cpu bar=2 20").await, Err(super::Error::HardLimitReached {}) )); } @@ -2593,7 +2596,7 @@ mod tests { let db = Arc::new(test_db.db); - write_lp(db.as_ref(), "cpu bar=1 10"); + write_lp(db.as_ref(), "cpu bar=1 10").await; test_db .metric_registry @@ -2829,7 +2832,7 @@ mod tests { } // ==================== check: DB still writable ==================== - write_lp(db.as_ref(), "cpu bar=1 10"); + write_lp(db.as_ref(), "cpu bar=1 10").await; } #[tokio::test] @@ -2996,7 +2999,7 @@ mod tests { } // ==================== check: DB still writable ==================== - write_lp(db.as_ref(), "cpu bar=1 10"); + write_lp(db.as_ref(), "cpu bar=1 10").await; } #[tokio::test] @@ -3015,7 +3018,7 @@ mod tests { } async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { - write_lp(db, "cpu bar=1 10"); + write_lp(db, "cpu bar=1 10").await; let partition_key = "1970-01-01T00"; let table_name = "cpu"; diff --git a/server/src/lib.rs b/server/src/lib.rs index f1aaede24c..a1d7880788 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -714,7 +714,7 @@ where pub async fn write_entry_local(&self, db_name: &str, db: &Db, entry: Entry) -> Result<()> { let bytes = entry.data().len() as u64; - db.store_entry(entry).map_err(|e| { + db.store_entry(entry).await.map_err(|e| { self.metrics.ingest_entries_bytes_total.add_with_labels( bytes, &[ diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index 960d27acbe..d05596f35d 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -1,17 +1,19 @@ +use async_trait::async_trait; use entry::{Entry, Sequence}; /// A Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading entries /// from the Write Buffer at a later time. +#[async_trait] pub trait WriteBuffer: Sync + Send + std::fmt::Debug + 'static { /// Send an `Entry` to the write buffer and return information that can be used to restore /// entries at a later time. - fn store_entry( + async fn store_entry( &self, entry: &Entry, ) -> Result>; // TODO: interface for restoring, will look something like: - // fn restore_from(&self, sequence: &Sequence) -> Result, Err>; + // async fn restore_from(&self, sequence: &Sequence) -> Result, Err>; } #[derive(Debug)] @@ -19,8 +21,9 @@ pub struct KafkaBuffer { conn: String, } +#[async_trait] impl WriteBuffer for KafkaBuffer { - fn store_entry( + async fn store_entry( &self, _entry: &Entry, ) -> Result> { @@ -43,8 +46,9 @@ pub mod test_helpers { pub entries: Arc>>, } + #[async_trait] impl WriteBuffer for MockBuffer { - fn store_entry( + async fn store_entry( &self, entry: &Entry, ) -> Result> { diff --git a/server_benchmarks/benches/catalog_persistence.rs b/server_benchmarks/benches/catalog_persistence.rs index 4e6c6831bf..6734bc6ac9 100644 --- a/server_benchmarks/benches/catalog_persistence.rs +++ b/server_benchmarks/benches/catalog_persistence.rs @@ -73,7 +73,7 @@ async fn setup(object_store: Arc, done: &Mutex) { let partition_key = "1970-01-01T00"; for chunk_id in 0..N_CHUNKS { - let table_names = write_lp(&db, &lp); + let table_names = write_lp(&db, &lp).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key)