feat: Make Write Buffer store_entry async

In preparation for the Kafka write buffer implementation needing to call
async functions.
pull/24376/head
Carol (Nichols || Goulding) 2021-06-14 15:47:07 -04:00
parent dd447d940c
commit 93881da016
7 changed files with 122 additions and 113 deletions

View File

@ -163,7 +163,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
write_lp(&db, &data).await;
let scenario1 = DbScenario {
scenario_name: "Data in 4 partitions, open chunks of mutable buffer".into(),
db,
@ -171,7 +171,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
write_lp(&db, &data).await;
db.rollover_partition("h2o", "2020-03-01T00").await.unwrap();
db.rollover_partition("h2o", "2020-03-02T00").await.unwrap();
let scenario2 = DbScenario {
@ -183,7 +183,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
write_lp(&db, &data).await;
// roll over and load chunks into both RUB and OS
rollover_and_load(&db, "2020-03-01T00", "h2o").await;
rollover_and_load(&db, "2020-03-02T00", "h2o").await;

View File

@ -19,8 +19,8 @@ async fn setup() -> TestDb {
let db = &test_db.db;
// Chunk 0 has bar:[1-2]
write_lp(db, "cpu bar=1 10");
write_lp(db, "cpu bar=2 20");
write_lp(db, "cpu bar=1 10").await;
write_lp(db, "cpu bar=2 20").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db
@ -33,9 +33,9 @@ async fn setup() -> TestDb {
.unwrap();
// Chunk 1 has bar:[3-3] (going to get pruned)
write_lp(db, "cpu bar=3 10");
write_lp(db, "cpu bar=3 100");
write_lp(db, "cpu bar=3 1000");
write_lp(db, "cpu bar=3 10").await;
write_lp(db, "cpu bar=3 100").await;
write_lp(db, "cpu bar=3 1000").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db

View File

@ -57,7 +57,7 @@ impl DbSetup for NoData {
//
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
write_lp(&db, data).await;
// move data out of open chunk
assert_eq!(
db.rollover_partition(table_name, partition_key)
@ -95,7 +95,7 @@ impl DbSetup for NoData {
//
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
write_lp(&db, data).await;
// move data out of open chunk
assert_eq!(
db.rollover_partition(table_name, partition_key)
@ -285,7 +285,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
@ -307,7 +307,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
.await
@ -318,7 +318,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer and read buffer".into(),
@ -342,7 +342,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
"h2o,state=MA temp=70.4 50",
"h2o,state=MA other_temp=70.4 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
.await
@ -353,7 +353,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
"h2o,city=Boston other_temp=72.4 350",
"h2o,city=Boston temp=53.4,reading=51 50",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 1)
.await
@ -385,7 +385,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
.await
@ -402,7 +402,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 1)
.await
@ -419,7 +419,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 2)
.await
@ -436,7 +436,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 3)
.await
@ -467,7 +467,8 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
"h2o,state=MA,city=Boston other_temp=70.4 250",
]
.join("\n"),
);
)
.await;
// Use a background task to do the work note when I used
// TaskTracker::join, it ended up hanging for reasons I don't
@ -479,7 +480,8 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
write_lp(
&db,
&vec!["h2o,state=CA,city=Boston other_temp=72.4 350"].join("\n"),
);
)
.await;
db.write_chunk_to_object_store("h2o", partition_key, 0)
.await
@ -536,7 +538,7 @@ impl DbSetup for EndToEndTest {
let lp_data = lp_lines.join("\n");
let db = make_db().await.db;
write_lp(&db, &lp_data);
write_lp(&db, &lp_data).await;
let scenario1 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
@ -552,7 +554,7 @@ impl DbSetup for EndToEndTest {
pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DbScenario> {
// Scenario 1: One open chunk in MUB
let db = make_db().await.db;
write_lp(&db, data);
write_lp(&db, data).await;
let scenario1 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
@ -560,7 +562,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 2: One closed chunk in MUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -573,7 +575,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 3: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -589,7 +591,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 4: One closed chunk in both RUb and OS
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -609,7 +611,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 5: One closed chunk in OS only
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -643,8 +645,8 @@ pub async fn make_two_chunk_scenarios(
data2: &str,
) -> Vec<DbScenario> {
let db = make_db().await.db;
write_lp(&db, data1);
write_lp(&db, data2);
write_lp(&db, data1).await;
write_lp(&db, data2).await;
let scenario1 = DbScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
@ -652,13 +654,13 @@ pub async fn make_two_chunk_scenarios(
// spread across 2 mutable buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
write_lp(&db, data2);
write_lp(&db, data2).await;
let scenario2 = DbScenario {
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
db,
@ -666,7 +668,7 @@ pub async fn make_two_chunk_scenarios(
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -675,7 +677,7 @@ pub async fn make_two_chunk_scenarios(
.await
.unwrap();
}
write_lp(&db, data2);
write_lp(&db, data2).await;
let scenario3 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
db,
@ -683,13 +685,13 @@ pub async fn make_two_chunk_scenarios(
// in 2 read buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -710,13 +712,13 @@ pub async fn make_two_chunk_scenarios(
// in 2 read buffer chunks that also loaded into object store
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -745,13 +747,13 @@ pub async fn make_two_chunk_scenarios(
// Scenario 6: Two closed chunk in OS only
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -807,7 +809,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
) -> Vec<DbScenario> {
// Scenario 1: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -823,7 +825,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
// Scenario 2: One closed chunk in Parquet only
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await

View File

@ -862,7 +862,7 @@ impl Db {
}
/// Stores an entry based on the configuration.
pub fn store_entry(&self, entry: Entry) -> Result<()> {
pub async fn store_entry(&self, entry: Entry) -> Result<()> {
let immutable = {
let rules = self.rules.read();
rules.lifecycle_rules.immutable
@ -873,13 +873,19 @@ impl Db {
// If only the write buffer is configured, this is passing the data through to
// the write buffer, and it's not an error. We ignore the returned metadata; it
// will get picked up when data is read from the write buffer.
let _ = write_buffer.store_entry(&entry).context(WriteBufferError)?;
let _ = write_buffer
.store_entry(&entry)
.await
.context(WriteBufferError)?;
Ok(())
}
(Some(write_buffer), false) => {
// If using both write buffer and mutable buffer, we want to wait for the write
// buffer to return success before adding the entry to the mutable buffer.
let sequence = write_buffer.store_entry(&entry).context(WriteBufferError)?;
let sequence = write_buffer
.store_entry(&entry)
.await
.context(WriteBufferError)?;
let sequenced_entry = Arc::new(
SequencedEntry::new_from_sequence(sequence, entry)
.context(SequencedEntryError)?,
@ -1169,32 +1175,29 @@ pub mod test_helpers {
use std::collections::HashSet;
/// Try to write lineprotocol data and return all tables that where written.
pub fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
let entries = lp_to_entries(lp);
let mut tables = HashSet::new();
for entry in &entries {
for entry in entries {
if let Some(writes) = entry.partition_writes() {
for write in writes {
for batch in write.table_batches() {
tables.insert(batch.name().to_string());
}
}
db.store_entry(entry).await?;
}
}
entries
.into_iter()
.try_for_each(|entry| db.store_entry(entry))?;
let mut tables: Vec<_> = tables.into_iter().collect();
tables.sort();
Ok(tables)
}
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
pub fn write_lp(db: &Db, lp: &str) -> Vec<String> {
try_write_lp(db, lp).unwrap()
pub async fn write_lp(db: &Db, lp: &str) -> Vec<String> {
try_write_lp(db, lp).await.unwrap()
}
}
@ -1251,7 +1254,7 @@ mod tests {
let db = make_db().await.db;
db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry);
let res = db.store_entry(entry).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot write to this database: no mutable buffer configured"
@ -1272,7 +1275,7 @@ mod tests {
test_db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).unwrap();
test_db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
}
@ -1289,7 +1292,7 @@ mod tests {
.db;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).unwrap();
test_db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
@ -1310,7 +1313,7 @@ mod tests {
async fn read_write() {
// This test also exercises the path without a write buffer.
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 10").await;
let batches = run_query(db, "select * from cpu").await;
@ -1344,7 +1347,7 @@ mod tests {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
// A chunk has been opened
test_db
@ -1363,7 +1366,7 @@ mod tests {
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap();
// write into same chunk again.
write_lp(db.as_ref(), "cpu bar=2 10");
write_lp(db.as_ref(), "cpu bar=2 10").await;
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap();
@ -1472,7 +1475,7 @@ mod tests {
#[tokio::test]
async fn write_with_rollover() {
let db = Arc::new(make_db().await.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db
@ -1493,7 +1496,7 @@ mod tests {
assert_batches_sorted_eq!(expected, &batches);
// add new data
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=2 20").await;
let expected = vec![
"+-----+-------------------------------+",
"| bar | time |",
@ -1530,7 +1533,7 @@ mod tests {
"cpu,core=one user=10.0 11",
];
write_lp(db.as_ref(), &lines.join("\n"));
write_lp(db.as_ref(), &lines.join("\n")).await;
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db
@ -1559,8 +1562,8 @@ mod tests {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db
@ -1645,12 +1648,12 @@ mod tests {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20");
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10");
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20");
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10");
write_lp(db.as_ref(), "cpu,tag2=a bar=3 5");
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20").await;
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10").await;
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20").await;
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10").await;
write_lp(db.as_ref(), "cpu,tag2=a bar=3 5").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db
@ -1760,8 +1763,8 @@ mod tests {
let db = Arc::new(test_db.db);
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
//Now mark the MB chunk close
let partition_key = "1970-01-01T00";
@ -1859,8 +1862,8 @@ mod tests {
let db = Arc::new(test_db.db);
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
// Now mark the MB chunk close
let partition_key = "1970-01-01T00";
@ -1970,7 +1973,7 @@ mod tests {
let before_create = Utc::now();
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 10").await;
let after_write = Utc::now();
let last_write_prev = {
@ -1983,7 +1986,7 @@ mod tests {
partition.last_write_at()
};
write_lp(&db, "cpu bar=1 20");
write_lp(&db, "cpu bar=1 20").await;
{
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let partition = partition.read();
@ -1997,7 +2000,7 @@ mod tests {
let db = Arc::new(make_db().await.db);
// Given data loaded into two chunks
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 10").await;
let after_data_load = Utc::now();
// When the chunk is rolled over
@ -2035,8 +2038,8 @@ mod tests {
db.rules.write().lifecycle_rules.mutable_size_threshold =
Some(NonZeroUsize::new(2).unwrap());
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 20");
write_lp(&db, "cpu bar=1 10").await;
write_lp(&db, "cpu bar=1 20").await;
let partitions = db.catalog.partition_keys();
assert_eq!(partitions.len(), 1);
@ -2066,10 +2069,10 @@ mod tests {
#[tokio::test]
async fn chunks_sorted_by_times() {
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu val=1 1");
write_lp(&db, "mem val=2 400000000000001");
write_lp(&db, "cpu val=1 2");
write_lp(&db, "mem val=2 400000000000002");
write_lp(&db, "cpu val=1 1").await;
write_lp(&db, "mem val=2 400000000000001").await;
write_lp(&db, "cpu val=1 2").await;
write_lp(&db, "mem val=2 400000000000002").await;
let sort_rules = SortOrder {
order: Order::Desc,
@ -2101,8 +2104,8 @@ mod tests {
let db = Arc::new(make_db().await.db);
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 20");
write_lp(&db, "cpu bar=1 10").await;
write_lp(&db, "cpu bar=1 20").await;
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0]);
assert_eq!(
@ -2120,7 +2123,7 @@ mod tests {
// add a new chunk in mutable buffer, and move chunk1 (but
// not chunk 0) to read buffer
write_lp(&db, "cpu bar=1 30");
write_lp(&db, "cpu bar=1 30").await;
let mb_chunk = db
.rollover_partition("cpu", "1970-01-01T00")
.await
@ -2130,7 +2133,7 @@ mod tests {
.await
.unwrap();
write_lp(&db, "cpu bar=1 40");
write_lp(&db, "cpu bar=1 40").await;
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 2]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]);
@ -2169,11 +2172,11 @@ mod tests {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
// write into a separate partitiion
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000");
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000").await;
print!("Partitions: {:?}", db.partition_keys().unwrap());
@ -2212,9 +2215,9 @@ mod tests {
async fn partition_chunk_summaries_timestamp() {
let db = Arc::new(make_db().await.db);
let start = Utc::now();
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
let after_first_write = Utc::now();
write_lp(&db, "cpu bar=2 2");
write_lp(&db, "cpu bar=2 2").await;
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
let after_close = Utc::now();
@ -2264,11 +2267,11 @@ mod tests {
let db = Arc::new(make_db().await.db);
// get three chunks: one open, one closed in mb and one close in rb
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
write_lp(&db, "cpu bar=1,baz=2 2");
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000");
write_lp(&db, "cpu bar=1,baz=2 2").await;
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000").await;
print!("Partitions: {:?}", db.partition_keys().unwrap());
@ -2283,7 +2286,7 @@ mod tests {
print!("Partitions2: {:?}", db.partition_keys().unwrap());
db.rollover_partition("cpu", "1970-01-05T15").await.unwrap();
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000");
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await;
let chunk_summaries = db.chunk_summaries().expect("expected summary to return");
let chunk_summaries = normalize_summaries(chunk_summaries);
@ -2345,15 +2348,15 @@ mod tests {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
let chunk_id = db
.rollover_partition("cpu", "1970-01-01T00")
.await
.unwrap()
.unwrap()
.id();
write_lp(&db, "cpu bar=2,baz=3.0 2");
write_lp(&db, "mem foo=1 1");
write_lp(&db, "cpu bar=2,baz=3.0 2").await;
write_lp(&db, "mem foo=1 1").await;
// load a chunk to the read buffer
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id)
@ -2366,8 +2369,8 @@ mod tests {
.unwrap();
// write into a separate partition
write_lp(&db, "cpu bar=1 400000000000000");
write_lp(&db, "mem frob=3 400000000000001");
write_lp(&db, "cpu bar=1 400000000000000").await;
write_lp(&db, "mem frob=3 400000000000001").await;
print!("Partitions: {:?}", db.partition_keys().unwrap());
@ -2532,8 +2535,8 @@ mod tests {
let db = Arc::new(make_db().await.db);
// create MB partition
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
// MB => RB
let partition_key = "1970-01-01T00";
@ -2566,11 +2569,11 @@ mod tests {
db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap());
// inserting first line does not trigger hard buffer limit
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
// but second line will
assert!(matches!(
try_write_lp(db.as_ref(), "cpu bar=2 20"),
try_write_lp(db.as_ref(), "cpu bar=2 20").await,
Err(super::Error::HardLimitReached {})
));
}
@ -2593,7 +2596,7 @@ mod tests {
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
test_db
.metric_registry
@ -2829,7 +2832,7 @@ mod tests {
}
// ==================== check: DB still writable ====================
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
@ -2996,7 +2999,7 @@ mod tests {
}
// ==================== check: DB still writable ====================
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
@ -3015,7 +3018,7 @@ mod tests {
}
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10");
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";
let table_name = "cpu";

View File

@ -714,7 +714,7 @@ where
pub async fn write_entry_local(&self, db_name: &str, db: &Db, entry: Entry) -> Result<()> {
let bytes = entry.data().len() as u64;
db.store_entry(entry).map_err(|e| {
db.store_entry(entry).await.map_err(|e| {
self.metrics.ingest_entries_bytes_total.add_with_labels(
bytes,
&[

View File

@ -1,17 +1,19 @@
use async_trait::async_trait;
use entry::{Entry, Sequence};
/// A Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading entries
/// from the Write Buffer at a later time.
#[async_trait]
pub trait WriteBuffer: Sync + Send + std::fmt::Debug + 'static {
/// Send an `Entry` to the write buffer and return information that can be used to restore
/// entries at a later time.
fn store_entry(
async fn store_entry(
&self,
entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>>;
// TODO: interface for restoring, will look something like:
// fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>;
// async fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>;
}
#[derive(Debug)]
@ -19,8 +21,9 @@ pub struct KafkaBuffer {
conn: String,
}
#[async_trait]
impl WriteBuffer for KafkaBuffer {
fn store_entry(
async fn store_entry(
&self,
_entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>> {
@ -43,8 +46,9 @@ pub mod test_helpers {
pub entries: Arc<Mutex<Vec<Entry>>>,
}
#[async_trait]
impl WriteBuffer for MockBuffer {
fn store_entry(
async fn store_entry(
&self,
entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>> {

View File

@ -73,7 +73,7 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
let partition_key = "1970-01-01T00";
for chunk_id in 0..N_CHUNKS {
let table_names = write_lp(&db, &lp);
let table_names = write_lp(&db, &lp).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)