parent
e516f65cbe
commit
7b0b975433
|
@ -600,6 +600,41 @@ impl Db {
|
|||
tracker
|
||||
}
|
||||
|
||||
/// Spawns a task to perform
|
||||
/// [`load_chunk_to_object_store`](Self::load_chunk_to_object_store)
|
||||
pub fn load_chunk_to_object_store_in_background(
|
||||
self: &Arc<Self>,
|
||||
partition_key: String,
|
||||
chunk_id: u32,
|
||||
) -> TaskTracker<Job> {
|
||||
let name = self.rules.read().name.clone();
|
||||
let (tracker, registration) = self.jobs.register(Job::WriteChunk {
|
||||
db_name: name.to_string(),
|
||||
partition_key: partition_key.clone(),
|
||||
chunk_id,
|
||||
});
|
||||
|
||||
let captured = Arc::clone(&self);
|
||||
let task = async move {
|
||||
debug!(%name, %partition_key, %chunk_id, "background task loading chunk to object store");
|
||||
let result = captured
|
||||
.load_chunk_to_object_store(&partition_key, chunk_id)
|
||||
.await;
|
||||
if let Err(e) = result {
|
||||
info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
debug!(%name, %partition_key, %chunk_id, "background task completed writing chunk to object store");
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
tokio::spawn(task.track(registration));
|
||||
|
||||
tracker
|
||||
}
|
||||
|
||||
/// Returns the next write sequence number
|
||||
pub fn next_sequence(&self) -> u64 {
|
||||
self.sequence.fetch_add(1, Ordering::SeqCst)
|
||||
|
@ -1717,4 +1752,41 @@ mod tests {
|
|||
chunk_ids.sort_unstable();
|
||||
chunk_ids
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_chunk_to_object_store_in_background() {
|
||||
// Test that data can be written to object store using a background task
|
||||
let db = Arc::new(make_db());
|
||||
|
||||
// create MB partition
|
||||
write_lp(db.as_ref(), "cpu bar=1 10");
|
||||
write_lp(db.as_ref(), "cpu bar=2 20");
|
||||
|
||||
// MB => RB
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mb_chunk = db.rollover_partition(partition_key).await.unwrap();
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(mb_chunk.id(), rb_chunk.id());
|
||||
|
||||
// RB => OS
|
||||
let task =
|
||||
db.load_chunk_to_object_store_in_background(partition_key.to_string(), rb_chunk.id());
|
||||
let t_start = std::time::Instant::now();
|
||||
while !task.is_complete() {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
assert!(
|
||||
std::time::Instant::now() - t_start < std::time::Duration::from_secs(10),
|
||||
"task deadline exceeded"
|
||||
);
|
||||
}
|
||||
|
||||
// we should have chunks in the mutable buffer, read buffer, and object store
|
||||
// (Note the currently open chunk is not listed)
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![1]);
|
||||
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]);
|
||||
assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue