feat: Parallelize loading snapshots from storage (#25657)

pull/25652/head^2
Michael Gattozzi 2024-12-13 15:47:56 -05:00 committed by GitHub
parent e0d8778bb1
commit 535ddd606d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 16 additions and 4 deletions

View File

@ -32,6 +32,7 @@ use std::any::Any;
use std::io::Write;
use std::sync::Arc;
use thiserror::Error;
use tokio::task::JoinSet;
use uuid::Uuid;
#[derive(Debug, Error)]
@ -187,7 +188,7 @@ impl Persister {
///
/// This is intended to be used on server start.
pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result<Vec<PersistedSnapshot>> {
let mut output = Vec::new();
let mut join_set = JoinSet::new();
let mut offset: Option<ObjPath> = None;
while most_recent_n > 0 {
let count = if most_recent_n > 1000 {
@ -228,9 +229,19 @@ impl Persister {
let len = list.len();
let end = if len <= count { len } else { count };
async fn get_snapshot(
location: ObjPath,
object_store: Arc<dyn ObjectStore>,
) -> Result<PersistedSnapshot> {
let bytes = object_store.get(&location).await?.bytes().await?;
serde_json::from_slice(&bytes).map_err(Into::into)
}
for item in &list[0..end] {
let bytes = self.object_store.get(&item.location).await?.bytes().await?;
output.push(serde_json::from_slice(&bytes)?);
join_set.spawn(get_snapshot(
item.location.clone(),
Arc::clone(&self.object_store),
));
}
if end == 0 {
@ -243,7 +254,8 @@ impl Persister {
offset = Some(list[end - 1].location.clone());
}
Ok(output)
// Returns an error if there is one and reuses the Vec's memory as well
join_set.join_all().await.into_iter().collect()
}
/// Loads a Parquet file from ObjectStore