refactor: rename load_chunk_to_read_buffer to move_chunk_to_read_buffer (#1857)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
a849c50749
commit
9e1723620c
|
@ -28,7 +28,7 @@ async fn setup() -> TestDb {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -43,7 +43,7 @@ async fn setup() -> TestDb {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -111,7 +111,7 @@ impl DbSetup for NoData {
|
|||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now load the closed chunk into the RB
|
||||
db.load_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
|
@ -149,7 +149,7 @@ impl DbSetup for NoData {
|
|||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now load the closed chunk into the RB
|
||||
db.load_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
|
@ -348,7 +348,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -383,7 +383,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -394,7 +394,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 1)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -426,7 +426,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -443,7 +443,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 1)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -460,7 +460,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 2)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 2)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -477,7 +477,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
|
|||
];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition("h2o", partition_key).await.unwrap();
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 3)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 3)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -512,7 +512,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
|
|||
// Use a background task to do the work note when I used
|
||||
// TaskTracker::join, it ended up hanging for reasons I don't
|
||||
// now
|
||||
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
db.move_chunk_to_read_buffer("h2o", partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -619,7 +619,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
db.rollover_partition(&table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -635,7 +635,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
db.rollover_partition(&table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -655,7 +655,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
db.rollover_partition(&table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.write_chunk_to_object_store(&table_name, partition_key, 0)
|
||||
|
@ -712,7 +712,7 @@ pub async fn make_two_chunk_scenarios(
|
|||
db.rollover_partition(&table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -736,11 +736,11 @@ pub async fn make_two_chunk_scenarios(
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -763,11 +763,11 @@ pub async fn make_two_chunk_scenarios(
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -798,11 +798,11 @@ pub async fn make_two_chunk_scenarios(
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -833,7 +833,7 @@ pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
|
|||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.write_chunk_to_object_store(table_name, partition_key, 0)
|
||||
|
@ -853,7 +853,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
|
|||
db.rollover_partition(&table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -869,7 +869,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
|
|||
db.rollover_partition(&table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.write_chunk_to_object_store(&table_name, partition_key, 0)
|
||||
|
|
|
@ -423,14 +423,14 @@ impl Db {
|
|||
/// but the process may take a long time
|
||||
///
|
||||
/// Returns a handle to the newly loaded chunk in the read buffer
|
||||
pub async fn load_chunk_to_read_buffer(
|
||||
pub async fn move_chunk_to_read_buffer(
|
||||
&self,
|
||||
table_name: &str,
|
||||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<DbChunk>> {
|
||||
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
|
||||
let (_, fut) = Self::load_chunk_to_read_buffer_impl(chunk.write())?;
|
||||
let (_, fut) = Self::move_chunk_to_read_buffer_impl(chunk.write())?;
|
||||
fut.await.context(TaskCancelled)?
|
||||
}
|
||||
|
||||
|
@ -438,7 +438,7 @@ impl Db {
|
|||
///
|
||||
/// Returns a future registered with the tracker registry, and the corresponding tracker
|
||||
/// The caller can either spawn this future to tokio, or block directly on it
|
||||
fn load_chunk_to_read_buffer_impl(
|
||||
fn move_chunk_to_read_buffer_impl(
|
||||
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
|
||||
) -> Result<(
|
||||
TaskTracker<Job>,
|
||||
|
@ -1361,7 +1361,7 @@ mod tests {
|
|||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1143)
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
|
||||
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1531,7 +1531,7 @@ mod tests {
|
|||
.unwrap()
|
||||
.unwrap();
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1624,7 +1624,7 @@ mod tests {
|
|||
let mb = collect_read_filter(&mb_chunk).await;
|
||||
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1734,7 +1734,7 @@ mod tests {
|
|||
.unwrap();
|
||||
// Move that MB chunk to RB chunk and drop it from MB
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
// Write the RB chunk to Object Store but keep it in RB
|
||||
|
@ -1833,7 +1833,7 @@ mod tests {
|
|||
.unwrap();
|
||||
// Move that MB chunk to RB chunk and drop it from MB
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
// Write the RB chunk to Object Store but keep it in RB
|
||||
|
@ -2134,7 +2134,7 @@ mod tests {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -2280,7 +2280,7 @@ mod tests {
|
|||
|
||||
print!("Partitions: {:?}", db.partition_keys().unwrap());
|
||||
|
||||
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
|
||||
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -2364,7 +2364,7 @@ mod tests {
|
|||
write_lp(&db, "mem foo=1 1").await;
|
||||
|
||||
// load a chunk to the read buffer
|
||||
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id)
|
||||
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -2552,7 +2552,7 @@ mod tests {
|
|||
.unwrap()
|
||||
.unwrap();
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer(table_name, partition_key, mb_chunk.id())
|
||||
.move_chunk_to_read_buffer(table_name, partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(mb_chunk.id(), rb_chunk.id());
|
||||
|
@ -2999,7 +2999,7 @@ mod tests {
|
|||
mb_chunk.id()
|
||||
};
|
||||
// Move that MB chunk to RB chunk and drop it from MB
|
||||
db.load_chunk_to_read_buffer(table_name, partition_key, chunk_id)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
|
|||
s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
|
||||
) -> Result<TaskTracker<Self::Job>, Self::Error> {
|
||||
info!(chunk=%s.addr(), "move to read buffer");
|
||||
let (tracker, fut) = Db::load_chunk_to_read_buffer_impl(s)?;
|
||||
let (tracker, fut) = Db::move_chunk_to_read_buffer_impl(s)?;
|
||||
let _ = tokio::spawn(async move { fut.await.log_if_error("move to read buffer") });
|
||||
Ok(tracker)
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(&table_name, partition_key, chunk_id)
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
Loading…
Reference in New Issue