From 535ddd606db03c04b59dca57386c8e2cae43c649 Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Fri, 13 Dec 2024 15:47:56 -0500 Subject: [PATCH] feat: Parallelize loading snapshots from storage (#25657) --- influxdb3_write/src/persister.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index 50a39b9829..23277bceeb 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -32,6 +32,7 @@ use std::any::Any; use std::io::Write; use std::sync::Arc; use thiserror::Error; +use tokio::task::JoinSet; use uuid::Uuid; #[derive(Debug, Error)] @@ -187,7 +188,7 @@ impl Persister { /// /// This is intended to be used on server start. pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result> { - let mut output = Vec::new(); + let mut join_set = JoinSet::new(); let mut offset: Option = None; while most_recent_n > 0 { let count = if most_recent_n > 1000 { @@ -228,9 +229,19 @@ impl Persister { let len = list.len(); let end = if len <= count { len } else { count }; + async fn get_snapshot( + location: ObjPath, + object_store: Arc, + ) -> Result { + let bytes = object_store.get(&location).await?.bytes().await?; + serde_json::from_slice(&bytes).map_err(Into::into) + } + for item in &list[0..end] { - let bytes = self.object_store.get(&item.location).await?.bytes().await?; - output.push(serde_json::from_slice(&bytes)?); + join_set.spawn(get_snapshot( + item.location.clone(), + Arc::clone(&self.object_store), + )); } if end == 0 { @@ -243,7 +254,8 @@ impl Persister { offset = Some(list[end - 1].location.clone()); } - Ok(output) + // Returns an error if there is one and reuses the Vec's memory as well + join_set.join_all().await.into_iter().collect() } /// Loads a Parquet file from ObjectStore