chore: updates for pro (#25285)

This applies some needed updates downstream in Pro. Namely,
* visibility changes that allow types to be used in the pro buffer
* allow parsing a WAL file sequence number from a file path
* remove duplicates when adding parquet files to a persisted files list
pull/25304/head
Trevor Hilton 2024-09-04 13:02:07 -07:00 committed by GitHub
parent cb76f7a63c
commit 4e664d3da5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 50 additions and 17 deletions

View File

@ -4,7 +4,7 @@
//! index files in object storage.
pub mod object_store;
mod serialize;
pub mod serialize;
mod snapshot_tracker;
use crate::snapshot_tracker::SnapshotInfo;
@ -17,11 +17,11 @@ use iox_time::Time;
use observability_deps::tracing::error;
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{any::Any, num::ParseIntError};
use thiserror::Error;
use tokio::sync::{oneshot, OwnedSemaphorePermit};
@ -47,6 +47,9 @@ pub enum Error {
#[error("last cache size must be from 1 to 10")]
InvalidLastCacheSize,
#[error("invalid WAL file path")]
InvalidWalFilePath,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -614,6 +617,14 @@ impl std::fmt::Display for WalFileSequenceNumber {
}
}
impl FromStr for WalFileSequenceNumber {
type Err = ParseIntError;
fn from_str(s: &str) -> std::prelude::v1::Result<Self, Self::Err> {
s.parse::<u64>().map(Self)
}
}
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]

View File

@ -8,7 +8,7 @@ use bytes::Bytes;
use data_types::Timestamp;
use futures_util::stream::StreamExt;
use hashbrown::HashMap;
use object_store::path::Path;
use object_store::path::{Path, PathPart};
use object_store::{ObjectStore, PutPayload};
use observability_deps::tracing::{debug, error, info};
use std::sync::Arc;
@ -582,13 +582,29 @@ impl WalBuffer {
}
}
fn wal_path(host_identifier_prefix: &str, wal_file_number: WalFileSequenceNumber) -> Path {
pub fn wal_path(host_identifier_prefix: &str, wal_file_number: WalFileSequenceNumber) -> Path {
Path::from(format!(
"{host_identifier_prefix}/wal/{:011}.wal",
wal_file_number.0
))
}
impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber {
type Error = crate::Error;
fn try_from(path: &'a Path) -> Result<Self, Self::Error> {
let parts: Vec<PathPart<'_>> = path.parts().collect();
if parts.len() != 3 {
return Err(crate::Error::InvalidWalFilePath);
}
parts[2]
.as_ref()
.trim_end_matches(".wal")
.parse()
.map_err(|_| crate::Error::InvalidWalFilePath)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -32,7 +32,7 @@ pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
/// The first bytes written into a wal file to identify it and its version.
const FILE_TYPE_IDENTIFIER: &[u8] = b"idb3.001";
pub(crate) fn verify_file_type_and_deserialize(b: Bytes) -> Result<WalContents> {
pub fn verify_file_type_and_deserialize(b: Bytes) -> Result<WalContents> {
let contents = b.to_vec();
let pos = FILE_TYPE_IDENTIFIER.len();

View File

@ -422,7 +422,7 @@ impl LastCacheProvider {
/// to find entries that belong in the cache.
///
/// Only if rows are newer than the latest entry in the cache will they be entered.
pub(crate) fn write_wal_contents_to_cache(&self, wal_contents: &WalContents) {
pub fn write_wal_contents_to_cache(&self, wal_contents: &WalContents) {
let mut cache_map = self.cache_map.write();
for op in &wal_contents.ops {
match op {
@ -451,7 +451,7 @@ impl LastCacheProvider {
/// Recurse down the cache structure to evict expired cache entries, based on their respective
/// time-to-live (TTL).
pub(crate) fn evict_expired_cache_entries(&self) {
pub fn evict_expired_cache_entries(&self) {
let mut cache_map = self.cache_map.write();
cache_map.iter_mut().for_each(|(_, db)| {
db.iter_mut()

View File

@ -72,7 +72,7 @@ impl From<Error> for DataFusionError {
pub type Result<T, E = Error> = std::result::Result<T, E>;
const DEFAULT_OBJECT_STORE_URL: &str = "iox://influxdb3/";
pub const DEFAULT_OBJECT_STORE_URL: &str = "iox://influxdb3/";
/// The persister is the primary interface with object storage where InfluxDB stores all Parquet
/// data, catalog information, as well as WAL and snapshot data.

View File

@ -114,7 +114,8 @@ pub struct WriteBufferImpl {
last_cache: Arc<LastCacheProvider>,
}
const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000;
/// The maximum number of snapshots to load on start
pub const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000;
impl WriteBufferImpl {
pub async fn new(
@ -412,7 +413,7 @@ impl WriteBufferImpl {
}
}
pub(crate) fn parquet_chunk_from_file(
pub fn parquet_chunk_from_file(
parquet_file: &ParquetFile,
table_schema: &Schema,
object_store_url: ObjectStoreUrl,

View File

@ -66,9 +66,14 @@ impl PersistedFiles {
tables
.tables
.into_iter()
.for_each(|(table_name, mut new_parquet_files)| {
.for_each(|(table_name, new_parquet_files)| {
let table_files = db_tables.entry(table_name).or_default();
table_files.append(&mut new_parquet_files);
table_files.append(
&mut new_parquet_files
.into_iter()
.filter(|file| !table_files.contains(file))
.collect(),
);
});
});
}

View File

@ -307,22 +307,22 @@ impl WalFileNotifier for QueryableBuffer {
}
#[derive(Debug)]
struct BufferState {
db_to_table: HashMap<Arc<str>, TableNameToBufferMap>,
pub struct BufferState {
pub db_to_table: HashMap<Arc<str>, TableNameToBufferMap>,
catalog: Arc<Catalog>,
}
type TableNameToBufferMap = HashMap<Arc<str>, TableBuffer>;
impl BufferState {
fn new(catalog: Arc<Catalog>) -> Self {
pub fn new(catalog: Arc<Catalog>) -> Self {
Self {
db_to_table: HashMap::new(),
catalog,
}
}
fn buffer_ops(&mut self, ops: Vec<WalOp>, last_cache_provider: &LastCacheProvider) {
pub fn buffer_ops(&mut self, ops: Vec<WalOp>, last_cache_provider: &LastCacheProvider) {
for op in ops {
match op {
WalOp::Write(write_batch) => self.add_write_batch(write_batch),

View File

@ -160,7 +160,7 @@ impl TableBuffer {
}
#[derive(Debug, Clone)]
pub(crate) struct SnapshotChunk {
pub struct SnapshotChunk {
pub(crate) chunk_time: i64,
pub(crate) timestamp_min_max: TimestampMinMax,
pub(crate) record_batch: RecordBatch,